blob: 69e6638202391fd1c27c8809d4e2a27e5c5d1db4 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
3#include <algorithm>
4#include <deque>
milind1f1dca32021-07-03 13:50:07 -07005#include <optional>
6#include <queue>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08007#include <string_view>
Brian Silverman661eb8d2020-08-12 19:41:01 -07008#include <vector>
Alex Perrycb7da4b2019-08-28 19:35:56 -07009
10#include "absl/container/btree_map.h"
Brian Silverman661eb8d2020-08-12 19:41:01 -070011#include "aos/events/aos_logging.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/simulated_network_bridge.h"
Austin Schuh094d09b2020-11-20 23:26:52 -080013#include "aos/init.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070014#include "aos/json_to_flatbuffer.h"
Austin Schuhcc6070c2020-10-10 20:25:56 -070015#include "aos/realtime.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070016#include "aos/util/phased_loop.h"
17
18namespace aos {
19
Brian Silverman661eb8d2020-08-12 19:41:01 -070020class SimulatedEventLoop;
21class SimulatedFetcher;
22class SimulatedChannel;
23
24namespace {
25
Austin Schuh057d29f2021-08-21 23:05:15 -070026std::string NodeName(const Node *node) {
27 if (node == nullptr) {
28 return "";
29 }
30
31 return absl::StrCat(node->name()->string_view(), " ");
32}
33
Austin Schuhcc6070c2020-10-10 20:25:56 -070034class ScopedMarkRealtimeRestorer {
35 public:
36 ScopedMarkRealtimeRestorer(bool rt) : rt_(rt), prior_(MarkRealtime(rt)) {}
37 ~ScopedMarkRealtimeRestorer() { CHECK_EQ(rt_, MarkRealtime(prior_)); }
38
39 private:
40 const bool rt_;
41 const bool prior_;
42};
43
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070044// Holds storage for a span object and the data referenced by that span for
45// compatibility with RawSender::SharedSpan users. If constructed with
46// MakeSharedSpan, span points to only the aligned segment of the entire data.
47struct AlignedOwningSpan {
48 AlignedOwningSpan(const AlignedOwningSpan &) = delete;
49 AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
50 absl::Span<const uint8_t> span;
51 char data[];
52};
53
54// Constructs a span which owns its data through a shared_ptr. The owning span
55// points to a const view of the data; also returns a temporary mutable span
56// which is only valid while the const shared span is kept alive.
57std::pair<RawSender::SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(
58 size_t size) {
59 AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
60 malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
61
62 absl::Span mutable_span(
63 reinterpret_cast<uint8_t *>(RoundChannelData(&span->data[0], size)),
64 size);
65 new (span) AlignedOwningSpan{.span = mutable_span};
66
67 return std::make_pair(
68 RawSender::SharedSpan(
69 std::shared_ptr<AlignedOwningSpan>(span,
70 [](AlignedOwningSpan *s) {
71 s->~AlignedOwningSpan();
72 free(s);
73 }),
74 &span->span),
75 mutable_span);
76}
77
Alex Perrycb7da4b2019-08-28 19:35:56 -070078// Container for both a message, and the context for it for simulation. This
79// makes tracking the timestamps associated with the data easy.
Brian Silverman661eb8d2020-08-12 19:41:01 -070080struct SimulatedMessage final {
81 SimulatedMessage(const SimulatedMessage &) = delete;
82 SimulatedMessage &operator=(const SimulatedMessage &) = delete;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070083 ~SimulatedMessage();
Brian Silverman661eb8d2020-08-12 19:41:01 -070084
85 // Creates a SimulatedMessage with size bytes of storage.
86 // This is a shared_ptr so we don't have to implement refcounting or copying.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070087 static std::shared_ptr<SimulatedMessage> Make(
88 SimulatedChannel *channel, const RawSender::SharedSpan data);
Brian Silverman661eb8d2020-08-12 19:41:01 -070089
Alex Perrycb7da4b2019-08-28 19:35:56 -070090 // Context for the data.
91 Context context;
92
Brian Silverman661eb8d2020-08-12 19:41:01 -070093 SimulatedChannel *const channel = nullptr;
Brian Silverman661eb8d2020-08-12 19:41:01 -070094
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070095 // Owning span to this message's data. Depending on the sender may either
96 // represent the data of just the flatbuffer, or max channel size.
97 RawSender::SharedSpan data;
Alex Perrycb7da4b2019-08-28 19:35:56 -070098
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070099 // Mutable view of above data. If empty, this message is not mutable.
100 absl::Span<uint8_t> mutable_data;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700101
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700102 // Determines whether this message is mutable. Used for Send where the user
103 // fills out a message stored internally then gives us the size of data used.
104 bool is_mutable() const { return data->size() == mutable_data.size(); }
105
106 // Note: this should be private but make_shared requires it to be public. Use
107 // Make() above to construct.
Brian Silverman661eb8d2020-08-12 19:41:01 -0700108 SimulatedMessage(SimulatedChannel *channel_in);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700109};
110
Brian Silverman661eb8d2020-08-12 19:41:01 -0700111} // namespace
Austin Schuh39788ff2019-12-01 18:22:57 -0800112
Brian Silverman661eb8d2020-08-12 19:41:01 -0700113// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
114// for some reason...
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800115class SimulatedWatcher : public WatcherState, public EventScheduler::Event {
Austin Schuh39788ff2019-12-01 18:22:57 -0800116 public:
Austin Schuh7d87b672019-12-01 20:23:49 -0800117 SimulatedWatcher(
118 SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
119 const Channel *channel,
120 std::function<void(const Context &context, const void *message)> fn);
Austin Schuh39788ff2019-12-01 18:22:57 -0800121
Austin Schuh7d87b672019-12-01 20:23:49 -0800122 ~SimulatedWatcher() override;
Austin Schuh39788ff2019-12-01 18:22:57 -0800123
Austin Schuh8fb315a2020-11-19 22:33:58 -0800124 bool has_run() const;
125
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800126 void Handle() noexcept override;
127
Austin Schuh39788ff2019-12-01 18:22:57 -0800128 void Startup(EventLoop * /*event_loop*/) override {}
129
Austin Schuh7d87b672019-12-01 20:23:49 -0800130 void Schedule(std::shared_ptr<SimulatedMessage> message);
131
Austin Schuhf4b09c72021-12-08 12:04:37 -0800132 void HandleEvent() noexcept;
Austin Schuh39788ff2019-12-01 18:22:57 -0800133
134 void SetSimulatedChannel(SimulatedChannel *channel) {
135 simulated_channel_ = channel;
136 }
137
138 private:
Austin Schuh7d87b672019-12-01 20:23:49 -0800139 void DoSchedule(monotonic_clock::time_point event_time);
140
141 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
142
Brian Silverman4f4e0612020-08-12 19:54:41 -0700143 SimulatedEventLoop *const simulated_event_loop_;
144 const Channel *const channel_;
145 EventScheduler *const scheduler_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800146 EventHandler<SimulatedWatcher> event_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800147 EventScheduler::Token token_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800148 SimulatedChannel *simulated_channel_ = nullptr;
149};
Alex Perrycb7da4b2019-08-28 19:35:56 -0700150
151class SimulatedChannel {
152 public:
Austin Schuh8fb315a2020-11-19 22:33:58 -0800153 explicit SimulatedChannel(const Channel *channel,
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700154 std::chrono::nanoseconds channel_storage_duration,
155 const EventScheduler *scheduler)
Austin Schuh39788ff2019-12-01 18:22:57 -0800156 : channel_(channel),
Brian Silverman661eb8d2020-08-12 19:41:01 -0700157 channel_storage_duration_(channel_storage_duration),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700158 next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
159 scheduler_(scheduler) {
Brian Silvermanbc596c62021-10-15 14:04:54 -0700160 available_buffer_indices_.resize(number_buffers());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700161 for (int i = 0; i < number_buffers(); ++i) {
Brian Silvermanbc596c62021-10-15 14:04:54 -0700162 available_buffer_indices_[i] = i;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700163 }
164 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700165
Brian Silverman661eb8d2020-08-12 19:41:01 -0700166 ~SimulatedChannel() {
167 latest_message_.reset();
168 CHECK_EQ(static_cast<size_t>(number_buffers()),
169 available_buffer_indices_.size());
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800170 CHECK_EQ(0u, fetchers_.size())
171 << configuration::StrippedChannelToString(channel());
172 CHECK_EQ(0u, watchers_.size())
173 << configuration::StrippedChannelToString(channel());
174 CHECK_EQ(0, sender_count_)
175 << configuration::StrippedChannelToString(channel());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700176 }
177
178 // The number of messages we pretend to have in the queue.
179 int queue_size() const {
180 return channel()->frequency() *
181 std::chrono::duration_cast<std::chrono::duration<double>>(
182 channel_storage_duration_)
183 .count();
184 }
185
milind1f1dca32021-07-03 13:50:07 -0700186 std::chrono::nanoseconds channel_storage_duration() const {
187 return channel_storage_duration_;
188 }
189
Brian Silverman661eb8d2020-08-12 19:41:01 -0700190 // The number of extra buffers (beyond the queue) we pretend to have.
191 int number_scratch_buffers() const {
192 // We need to start creating messages before we know how many
193 // senders+readers we'll have, so we need to just pick something which is
194 // always big enough.
195 return 50;
196 }
197
198 int number_buffers() const { return queue_size() + number_scratch_buffers(); }
199
200 int GetBufferIndex() {
201 CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
202 const int result = available_buffer_indices_.back();
203 available_buffer_indices_.pop_back();
204 return result;
205 }
206
207 void FreeBufferIndex(int i) {
Austin Schuhc5047ea2021-03-20 22:00:21 -0700208 // This extra checking has a large performance hit with sanitizers that
209 // track memory accesses, so just skip it.
210#if !__has_feature(memory_sanitizer) && !__has_feature(address_sanitizer)
Brian Silverman661eb8d2020-08-12 19:41:01 -0700211 DCHECK(std::find(available_buffer_indices_.begin(),
212 available_buffer_indices_.end(),
213 i) == available_buffer_indices_.end())
214 << ": Buffer is not in use: " << i;
Brian Silvermanf3e6df22021-01-19 15:02:21 -0800215#endif
Brian Silverman661eb8d2020-08-12 19:41:01 -0700216 available_buffer_indices_.push_back(i);
217 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700218
219 // Makes a connected raw sender which calls Send below.
Austin Schuh8fb315a2020-11-19 22:33:58 -0800220 ::std::unique_ptr<RawSender> MakeRawSender(SimulatedEventLoop *event_loop);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700221
222 // Makes a connected raw fetcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800223 ::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700224
225 // Registers a watcher for the queue.
Austin Schuh7d87b672019-12-01 20:23:49 -0800226 void MakeRawWatcher(SimulatedWatcher *watcher);
Austin Schuh39788ff2019-12-01 18:22:57 -0800227
Austin Schuh7d87b672019-12-01 20:23:49 -0800228 void RemoveWatcher(SimulatedWatcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800229 watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
230 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700231
Austin Schuhad154822019-12-27 15:45:13 -0800232 // Sends the message to all the connected receivers and fetchers. Returns the
milind1f1dca32021-07-03 13:50:07 -0700233 // sent queue index, or std::nullopt if messages were sent too fast.
234 std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700235
236 // Unregisters a fetcher.
237 void UnregisterFetcher(SimulatedFetcher *fetcher);
238
239 std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
240
Austin Schuh39788ff2019-12-01 18:22:57 -0800241 size_t max_size() const { return channel()->max_size(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700242
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800243 const std::string_view name() const {
Austin Schuh39788ff2019-12-01 18:22:57 -0800244 return channel()->name()->string_view();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700245 }
246
Austin Schuh39788ff2019-12-01 18:22:57 -0800247 const Channel *channel() const { return channel_; }
248
Austin Schuhe516ab02020-05-06 21:37:04 -0700249 void CountSenderCreated() {
Brian Silverman661eb8d2020-08-12 19:41:01 -0700250 CheckBufferCount();
Austin Schuhe516ab02020-05-06 21:37:04 -0700251 if (sender_count_ >= channel()->num_senders()) {
252 LOG(FATAL) << "Failed to create sender on "
253 << configuration::CleanedChannelToString(channel())
254 << ", too many senders.";
255 }
256 ++sender_count_;
257 }
Brian Silverman77162972020-08-12 19:52:40 -0700258
Austin Schuhe516ab02020-05-06 21:37:04 -0700259 void CountSenderDestroyed() {
260 --sender_count_;
261 CHECK_GE(sender_count_, 0);
262 }
263
Alex Perrycb7da4b2019-08-28 19:35:56 -0700264 private:
Brian Silverman77162972020-08-12 19:52:40 -0700265 void CheckBufferCount() {
266 int reader_count = 0;
267 if (channel()->read_method() == ReadMethod::PIN) {
268 reader_count = watchers_.size() + fetchers_.size();
269 }
270 CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
271 }
272
273 void CheckReaderCount() {
274 if (channel()->read_method() != ReadMethod::PIN) {
275 return;
276 }
277 CheckBufferCount();
278 const int reader_count = watchers_.size() + fetchers_.size();
279 if (reader_count >= channel()->num_readers()) {
280 LOG(FATAL) << "Failed to create reader on "
281 << configuration::CleanedChannelToString(channel())
282 << ", too many readers.";
283 }
284 }
Brian Silverman661eb8d2020-08-12 19:41:01 -0700285
286 const Channel *const channel_;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700287 const std::chrono::nanoseconds channel_storage_duration_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700288
289 // List of all watchers.
Austin Schuh7d87b672019-12-01 20:23:49 -0800290 ::std::vector<SimulatedWatcher *> watchers_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700291
292 // List of all fetchers.
293 ::std::vector<SimulatedFetcher *> fetchers_;
294 std::shared_ptr<SimulatedMessage> latest_message_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700295
296 ipc_lib::QueueIndex next_queue_index_;
Austin Schuhe516ab02020-05-06 21:37:04 -0700297
298 int sender_count_ = 0;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700299
300 std::vector<uint16_t> available_buffer_indices_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700301
302 const EventScheduler *scheduler_;
303
304 // Queue of all the message send times in the last channel_storage_duration_
305 std::queue<monotonic_clock::time_point> last_times_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700306};
307
308namespace {
309
Brian Silverman661eb8d2020-08-12 19:41:01 -0700310std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700311 SimulatedChannel *channel, RawSender::SharedSpan data) {
Austin Schuh62288252020-11-18 23:26:04 -0800312 // The allocations in here are due to infrastructure and don't count in the no
313 // mallocs in RT code.
314 ScopedNotRealtime nrt;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700315
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700316 auto message = std::make_shared<SimulatedMessage>(channel);
317 message->context.size = data->size();
318 message->context.data = data->data();
319 message->data = std::move(data);
320
321 return message;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700322}
323
324SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
325 : channel(channel_in) {
Brian Silverman4f4e0612020-08-12 19:54:41 -0700326 context.buffer_index = channel->GetBufferIndex();
Brian Silverman661eb8d2020-08-12 19:41:01 -0700327}
328
329SimulatedMessage::~SimulatedMessage() {
Brian Silverman4f4e0612020-08-12 19:54:41 -0700330 channel->FreeBufferIndex(context.buffer_index);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700331}
332
333class SimulatedSender : public RawSender {
334 public:
Austin Schuh8fb315a2020-11-19 22:33:58 -0800335 SimulatedSender(SimulatedChannel *simulated_channel,
336 SimulatedEventLoop *event_loop);
337 ~SimulatedSender() override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700338
339 void *data() override {
340 if (!message_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700341 auto [span, mutable_span] =
342 MakeSharedSpan(simulated_channel_->max_size());
343 message_ = SimulatedMessage::Make(simulated_channel_, span);
344 message_->mutable_data = mutable_span;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700345 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700346 CHECK(message_->is_mutable());
347 return message_->mutable_data.data();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700348 }
349
350 size_t size() override { return simulated_channel_->max_size(); }
351
milind1f1dca32021-07-03 13:50:07 -0700352 Error DoSend(size_t length, monotonic_clock::time_point monotonic_remote_time,
353 realtime_clock::time_point realtime_remote_time,
354 uint32_t remote_queue_index,
355 const UUID &source_boot_uuid) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700356
milind1f1dca32021-07-03 13:50:07 -0700357 Error DoSend(const void *msg, size_t size,
358 monotonic_clock::time_point monotonic_remote_time,
359 realtime_clock::time_point realtime_remote_time,
360 uint32_t remote_queue_index,
361 const UUID &source_boot_uuid) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700362
milind1f1dca32021-07-03 13:50:07 -0700363 Error DoSend(const SharedSpan data,
364 aos::monotonic_clock::time_point monotonic_remote_time,
365 aos::realtime_clock::time_point realtime_remote_time,
366 uint32_t remote_queue_index,
367 const UUID &source_boot_uuid) override;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700368
Brian Silverman4f4e0612020-08-12 19:54:41 -0700369 int buffer_index() override {
370 // First, ensure message_ is allocated.
371 data();
372 return message_->context.buffer_index;
373 }
374
Alex Perrycb7da4b2019-08-28 19:35:56 -0700375 private:
376 SimulatedChannel *simulated_channel_;
Austin Schuh58646e22021-08-23 23:51:46 -0700377 SimulatedEventLoop *simulated_event_loop_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700378
379 std::shared_ptr<SimulatedMessage> message_;
380};
381} // namespace
382
383class SimulatedFetcher : public RawFetcher {
384 public:
Austin Schuhac0771c2020-01-07 18:36:30 -0800385 explicit SimulatedFetcher(EventLoop *event_loop,
386 SimulatedChannel *simulated_channel)
387 : RawFetcher(event_loop, simulated_channel->channel()),
388 simulated_channel_(simulated_channel) {}
389 ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700390
Austin Schuh39788ff2019-12-01 18:22:57 -0800391 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
Austin Schuh62288252020-11-18 23:26:04 -0800392 // The allocations in here are due to infrastructure and don't count in the
393 // no mallocs in RT code.
394 ScopedNotRealtime nrt;
Austin Schuh39788ff2019-12-01 18:22:57 -0800395 if (msgs_.size() == 0) {
396 return std::make_pair(false, monotonic_clock::min_time);
397 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700398
James Kuszmaulbcd96fc2020-10-12 20:29:32 -0700399 CHECK(!fell_behind_) << ": Got behind on "
400 << configuration::StrippedChannelToString(
401 simulated_channel_->channel());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700402
Alex Perrycb7da4b2019-08-28 19:35:56 -0700403 SetMsg(msgs_.front());
404 msgs_.pop_front();
Austin Schuha5e14192020-01-06 18:02:41 -0800405 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700406 }
407
Austin Schuh39788ff2019-12-01 18:22:57 -0800408 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
Austin Schuh62288252020-11-18 23:26:04 -0800409 // The allocations in here are due to infrastructure and don't count in the
410 // no mallocs in RT code.
411 ScopedNotRealtime nrt;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700412 if (msgs_.size() == 0) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800413 // TODO(austin): Can we just do this logic unconditionally? It is a lot
414 // simpler. And call clear, obviously.
Austin Schuhac0771c2020-01-07 18:36:30 -0800415 if (!msg_ && simulated_channel_->latest_message()) {
416 SetMsg(simulated_channel_->latest_message());
Austin Schuha5e14192020-01-06 18:02:41 -0800417 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700418 } else {
Austin Schuh39788ff2019-12-01 18:22:57 -0800419 return std::make_pair(false, monotonic_clock::min_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700420 }
421 }
422
423 // We've had a message enqueued, so we don't need to go looking for the
424 // latest message from before we started.
425 SetMsg(msgs_.back());
426 msgs_.clear();
Brian Silverman661eb8d2020-08-12 19:41:01 -0700427 fell_behind_ = false;
Austin Schuha5e14192020-01-06 18:02:41 -0800428 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700429 }
430
431 private:
432 friend class SimulatedChannel;
433
434 // Updates the state inside RawFetcher to point to the data in msg_.
435 void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800436 msg_ = std::move(msg);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700437 context_ = msg_->context;
Brian Silverman4f4e0612020-08-12 19:54:41 -0700438 if (channel()->read_method() != ReadMethod::PIN) {
439 context_.buffer_index = -1;
440 }
Austin Schuhad154822019-12-27 15:45:13 -0800441 if (context_.remote_queue_index == 0xffffffffu) {
442 context_.remote_queue_index = context_.queue_index;
443 }
Austin Schuh58646e22021-08-23 23:51:46 -0700444 if (context_.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800445 context_.monotonic_remote_time = context_.monotonic_event_time;
446 }
Austin Schuh58646e22021-08-23 23:51:46 -0700447 if (context_.realtime_remote_time == realtime_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800448 context_.realtime_remote_time = context_.realtime_event_time;
449 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700450 }
451
452 // Internal method for Simulation to add a message to the buffer.
453 void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800454 msgs_.emplace_back(std::move(buffer));
Brian Silverman661eb8d2020-08-12 19:41:01 -0700455 if (fell_behind_ ||
456 msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
457 fell_behind_ = true;
458 // Might as well empty out all the intermediate messages now.
459 while (msgs_.size() > 1) {
460 msgs_.pop_front();
461 }
462 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700463 }
464
Austin Schuhac0771c2020-01-07 18:36:30 -0800465 SimulatedChannel *simulated_channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700466 std::shared_ptr<SimulatedMessage> msg_;
467
468 // Messages queued up but not in use.
469 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700470
471 // Whether we're currently "behind", which means a FetchNext call will fail.
472 bool fell_behind_ = false;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700473};
474
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800475class SimulatedTimerHandler : public TimerHandler,
476 public EventScheduler::Event {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700477 public:
478 explicit SimulatedTimerHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800479 SimulatedEventLoop *simulated_event_loop,
Austin Schuh39788ff2019-12-01 18:22:57 -0800480 ::std::function<void()> fn);
Austin Schuh7d87b672019-12-01 20:23:49 -0800481 ~SimulatedTimerHandler() { Disable(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700482
483 void Setup(monotonic_clock::time_point base,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800484 monotonic_clock::duration repeat_offset) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700485
Austin Schuhf4b09c72021-12-08 12:04:37 -0800486 void HandleEvent() noexcept;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700487
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800488 void Handle() noexcept override;
489
Austin Schuh7d87b672019-12-01 20:23:49 -0800490 void Disable() override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700491
Alex Perrycb7da4b2019-08-28 19:35:56 -0700492 private:
Austin Schuh7d87b672019-12-01 20:23:49 -0800493 SimulatedEventLoop *simulated_event_loop_;
494 EventHandler<SimulatedTimerHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700495 EventScheduler *scheduler_;
496 EventScheduler::Token token_;
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800497
Alex Perrycb7da4b2019-08-28 19:35:56 -0700498 monotonic_clock::time_point base_;
499 monotonic_clock::duration repeat_offset_;
500};
501
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800502class SimulatedPhasedLoopHandler : public PhasedLoopHandler,
503 public EventScheduler::Event {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700504 public:
505 SimulatedPhasedLoopHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800506 SimulatedEventLoop *simulated_event_loop,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700507 ::std::function<void(int)> fn,
508 const monotonic_clock::duration interval,
Austin Schuh39788ff2019-12-01 18:22:57 -0800509 const monotonic_clock::duration offset);
Austin Schuh7d87b672019-12-01 20:23:49 -0800510 ~SimulatedPhasedLoopHandler();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700511
Austin Schuhf4b09c72021-12-08 12:04:37 -0800512 void HandleEvent() noexcept;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700513
Austin Schuh7d87b672019-12-01 20:23:49 -0800514 void Schedule(monotonic_clock::time_point sleep_time) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700515
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800516 void Handle() noexcept override;
517
Alex Perrycb7da4b2019-08-28 19:35:56 -0700518 private:
Austin Schuh39788ff2019-12-01 18:22:57 -0800519 SimulatedEventLoop *simulated_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800520 EventHandler<SimulatedPhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700521
Austin Schuh39788ff2019-12-01 18:22:57 -0800522 EventScheduler *scheduler_;
523 EventScheduler::Token token_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700524};
525
526class SimulatedEventLoop : public EventLoop {
527 public:
528 explicit SimulatedEventLoop(
Brian Silverman661eb8d2020-08-12 19:41:01 -0700529 EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700530 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
531 *channels,
532 const Configuration *configuration,
Austin Schuh057d29f2021-08-21 23:05:15 -0700533 std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
534 pid_t tid)
Austin Schuh83c7f702021-01-19 22:36:29 -0800535 : EventLoop(CHECK_NOTNULL(configuration)),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700536 scheduler_(scheduler),
Austin Schuhac0771c2020-01-07 18:36:30 -0800537 node_event_loop_factory_(node_event_loop_factory),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700538 channels_(channels),
Austin Schuh057d29f2021-08-21 23:05:15 -0700539 event_loops_(event_loops_),
Austin Schuh217a9782019-12-21 23:02:50 -0800540 node_(node),
Austin Schuh58646e22021-08-23 23:51:46 -0700541 tid_(tid),
542 startup_tracker_(std::make_shared<StartupTracker>()) {
543 startup_tracker_->loop = this;
544 scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
545 if (startup_tracker->loop) {
546 startup_tracker->loop->Setup();
547 startup_tracker->has_setup = true;
548 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700549 });
550
551 event_loops_->push_back(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700552 }
Austin Schuh58646e22021-08-23 23:51:46 -0700553
Alex Perrycb7da4b2019-08-28 19:35:56 -0700554 ~SimulatedEventLoop() override {
Austin Schuh39788ff2019-12-01 18:22:57 -0800555 // Trigger any remaining senders or fetchers to be cleared before destroying
556 // the event loop so the book keeping matches.
557 timing_report_sender_.reset();
558
559 // Force everything with a registered fd with epoll to be destroyed now.
560 timers_.clear();
561 phased_loops_.clear();
562 watchers_.clear();
563
Austin Schuh58646e22021-08-23 23:51:46 -0700564 for (auto it = event_loops_->begin(); it != event_loops_->end(); ++it) {
Austin Schuh057d29f2021-08-21 23:05:15 -0700565 if (*it == this) {
566 event_loops_->erase(it);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700567 break;
568 }
569 }
Austin Schuh58646e22021-08-23 23:51:46 -0700570 VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
571 << monotonic_now() << " ~SimulatedEventLoop(\"" << name_ << "\")";
572 startup_tracker_->loop = nullptr;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700573 }
574
Austin Schuh057d29f2021-08-21 23:05:15 -0700575 void SetIsRunning(bool running) {
Austin Schuh58646e22021-08-23 23:51:46 -0700576 VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
577 << monotonic_now() << " " << name_ << " set_is_running(" << running
578 << ")";
579 CHECK(startup_tracker_->has_setup);
Austin Schuh057d29f2021-08-21 23:05:15 -0700580
581 set_is_running(running);
Austin Schuh58646e22021-08-23 23:51:46 -0700582 if (running) {
583 has_run_ = true;
584 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700585 }
586
Austin Schuh8fb315a2020-11-19 22:33:58 -0800587 bool has_run() const { return has_run_; }
588
Austin Schuh7d87b672019-12-01 20:23:49 -0800589 std::chrono::nanoseconds send_delay() const { return send_delay_; }
590 void set_send_delay(std::chrono::nanoseconds send_delay) {
591 send_delay_ = send_delay;
592 }
593
Stephan Pleines559fa6c2022-01-06 17:23:51 -0800594 monotonic_clock::time_point monotonic_now() const override {
Austin Schuhac0771c2020-01-07 18:36:30 -0800595 return node_event_loop_factory_->monotonic_now();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700596 }
597
Stephan Pleines559fa6c2022-01-06 17:23:51 -0800598 realtime_clock::time_point realtime_now() const override {
Austin Schuhac0771c2020-01-07 18:36:30 -0800599 return node_event_loop_factory_->realtime_now();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700600 }
601
Austin Schuh58646e22021-08-23 23:51:46 -0700602 distributed_clock::time_point distributed_now() {
603 return scheduler_->distributed_now();
604 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700605
Austin Schuh58646e22021-08-23 23:51:46 -0700606 std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
607
608 std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700609
610 void MakeRawWatcher(
611 const Channel *channel,
612 ::std::function<void(const Context &context, const void *message)>
613 watcher) override;
614
615 TimerHandler *AddTimer(::std::function<void()> callback) override {
Austin Schuh39788ff2019-12-01 18:22:57 -0800616 CHECK(!is_running());
Austin Schuh8bd96322020-02-13 21:18:22 -0800617 return NewTimer(::std::unique_ptr<TimerHandler>(
618 new SimulatedTimerHandler(scheduler_, this, callback)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700619 }
620
621 PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
622 const monotonic_clock::duration interval,
623 const monotonic_clock::duration offset =
624 ::std::chrono::seconds(0)) override {
Austin Schuh8bd96322020-02-13 21:18:22 -0800625 return NewPhasedLoop(
626 ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
627 scheduler_, this, callback, interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700628 }
629
630 void OnRun(::std::function<void()> on_run) override {
Austin Schuh8fb315a2020-11-19 22:33:58 -0800631 CHECK(!is_running()) << ": Cannot register OnRun callback while running.";
Austin Schuhcc6070c2020-10-10 20:25:56 -0700632 scheduler_->ScheduleOnRun([this, on_run = std::move(on_run)]() {
Austin Schuhad9e5eb2021-11-19 20:33:55 -0800633 logging::ScopedLogRestorer prev_logger;
634 if (log_impl_) {
635 prev_logger.Swap(log_impl_);
636 }
Austin Schuhcc6070c2020-10-10 20:25:56 -0700637 ScopedMarkRealtimeRestorer rt(priority() > 0);
Austin Schuha9012be2021-07-21 15:19:11 -0700638 SetTimerContext(monotonic_now());
Austin Schuhcc6070c2020-10-10 20:25:56 -0700639 on_run();
640 });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700641 }
642
Austin Schuh217a9782019-12-21 23:02:50 -0800643 const Node *node() const override { return node_; }
644
James Kuszmaul3ae42262019-11-08 12:33:41 -0800645 void set_name(const std::string_view name) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700646 name_ = std::string(name);
647 }
James Kuszmaul3ae42262019-11-08 12:33:41 -0800648 const std::string_view name() const override { return name_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700649
650 SimulatedChannel *GetSimulatedChannel(const Channel *channel);
651
Austin Schuh39788ff2019-12-01 18:22:57 -0800652 void SetRuntimeRealtimePriority(int priority) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700653 CHECK(!is_running()) << ": Cannot set realtime priority while running.";
Austin Schuh39788ff2019-12-01 18:22:57 -0800654 priority_ = priority;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700655 }
656
Austin Schuh39788ff2019-12-01 18:22:57 -0800657 int priority() const override { return priority_; }
658
Brian Silverman6a54ff32020-04-28 16:41:39 -0700659 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) override {
660 CHECK(!is_running()) << ": Cannot set affinity while running.";
661 }
662
Tyler Chatow67ddb032020-01-12 14:30:04 -0800663 void Setup() {
664 MaybeScheduleTimingReports();
665 if (!skip_logger_) {
Austin Schuhad9e5eb2021-11-19 20:33:55 -0800666 log_sender_.Initialize(&name_,
667 MakeSender<logging::LogMessageFbs>("/aos"));
Austin Schuha0c41ba2020-09-10 22:59:14 -0700668 log_impl_ = log_sender_.implementation();
Tyler Chatow67ddb032020-01-12 14:30:04 -0800669 }
670 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800671
Brian Silverman4f4e0612020-08-12 19:54:41 -0700672 int NumberBuffers(const Channel *channel) override;
673
Austin Schuh83c7f702021-01-19 22:36:29 -0800674 const UUID &boot_uuid() const override {
675 return node_event_loop_factory_->boot_uuid();
676 }
677
Alex Perrycb7da4b2019-08-28 19:35:56 -0700678 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800679 friend class SimulatedTimerHandler;
Austin Schuh7d87b672019-12-01 20:23:49 -0800680 friend class SimulatedPhasedLoopHandler;
681 friend class SimulatedWatcher;
682
Austin Schuh58646e22021-08-23 23:51:46 -0700683 // We have a condition where we register a startup handler, but then get shut
684 // down before it runs. This results in a segfault if we are lucky, and
685 // corruption otherwise. To handle that, allocate a small object which points
686 // back to us and can be freed when the function is freed. That object can
687 // then be updated when we get destroyed so setup is not called.
688 struct StartupTracker {
689 SimulatedEventLoop *loop = nullptr;
690 bool has_setup = false;
691 };
692
Austin Schuh7d87b672019-12-01 20:23:49 -0800693 void HandleEvent() {
694 while (true) {
695 if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
696 break;
697 }
698
699 EventLoopEvent *event = PopEvent();
700 event->HandleEvent();
701 }
702 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800703
Austin Schuh39788ff2019-12-01 18:22:57 -0800704 pid_t GetTid() override { return tid_; }
705
Alex Perrycb7da4b2019-08-28 19:35:56 -0700706 EventScheduler *scheduler_;
Austin Schuhac0771c2020-01-07 18:36:30 -0800707 NodeEventLoopFactory *node_event_loop_factory_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700708 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700709 std::vector<SimulatedEventLoop *> *event_loops_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700710
711 ::std::string name_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800712
713 int priority_ = 0;
714
Austin Schuh7d87b672019-12-01 20:23:49 -0800715 std::chrono::nanoseconds send_delay_;
716
Austin Schuh217a9782019-12-21 23:02:50 -0800717 const Node *const node_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800718 const pid_t tid_;
Tyler Chatow67ddb032020-01-12 14:30:04 -0800719
720 AosLogToFbs log_sender_;
Austin Schuha0c41ba2020-09-10 22:59:14 -0700721 std::shared_ptr<logging::LogImplementation> log_impl_ = nullptr;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800722
723 bool has_run_ = false;
Austin Schuh58646e22021-08-23 23:51:46 -0700724
725 std::shared_ptr<StartupTracker> startup_tracker_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700726};
727
Austin Schuh7d87b672019-12-01 20:23:49 -0800728void SimulatedEventLoopFactory::set_send_delay(
729 std::chrono::nanoseconds send_delay) {
730 send_delay_ = send_delay;
Austin Schuh58646e22021-08-23 23:51:46 -0700731 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
Austin Schuh057d29f2021-08-21 23:05:15 -0700732 if (node) {
733 for (SimulatedEventLoop *loop : node->event_loops_) {
734 loop->set_send_delay(send_delay_);
735 }
736 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800737 }
738}
739
Alex Perrycb7da4b2019-08-28 19:35:56 -0700740void SimulatedEventLoop::MakeRawWatcher(
741 const Channel *channel,
742 std::function<void(const Context &channel, const void *message)> watcher) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800743 TakeWatcher(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800744
Austin Schuh057d29f2021-08-21 23:05:15 -0700745 std::unique_ptr<SimulatedWatcher> shm_watcher =
746 std::make_unique<SimulatedWatcher>(this, scheduler_, channel,
747 std::move(watcher));
Austin Schuh39788ff2019-12-01 18:22:57 -0800748
749 GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
Austin Schuh057d29f2021-08-21 23:05:15 -0700750
Austin Schuh39788ff2019-12-01 18:22:57 -0800751 NewWatcher(std::move(shm_watcher));
Austin Schuh58646e22021-08-23 23:51:46 -0700752 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
753 << " " << name() << " MakeRawWatcher(\""
754 << configuration::StrippedChannelToString(channel) << "\")";
Austin Schuh8fb315a2020-11-19 22:33:58 -0800755
756 // Order of operations gets kinda wonky if we let people make watchers after
757 // running once. If someone has a valid use case, we can reconsider.
758 CHECK(!has_run()) << ": Can't add a watcher after running.";
Alex Perrycb7da4b2019-08-28 19:35:56 -0700759}
760
761std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
762 const Channel *channel) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800763 TakeSender(channel);
764
Austin Schuh58646e22021-08-23 23:51:46 -0700765 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
766 << " " << name() << " MakeRawSender(\""
767 << configuration::StrippedChannelToString(channel) << "\")";
Alex Perrycb7da4b2019-08-28 19:35:56 -0700768 return GetSimulatedChannel(channel)->MakeRawSender(this);
769}
770
771std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
772 const Channel *channel) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800773 ChannelIndex(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800774
Austin Schuhca4828c2019-12-28 14:21:35 -0800775 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
776 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
777 << "\", \"type\": \"" << channel->type()->string_view()
778 << "\" } is not able to be fetched on this node. Check your "
779 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800780 }
781
Austin Schuh58646e22021-08-23 23:51:46 -0700782 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
783 << " " << name() << " MakeRawFetcher(\""
784 << configuration::StrippedChannelToString(channel) << "\")";
Austin Schuh39788ff2019-12-01 18:22:57 -0800785 return GetSimulatedChannel(channel)->MakeRawFetcher(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700786}
787
788SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
789 const Channel *channel) {
790 auto it = channels_->find(SimpleChannel(channel));
791 if (it == channels_->end()) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700792 it = channels_
793 ->emplace(SimpleChannel(channel),
794 std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
795 channel,
796 std::chrono::nanoseconds(
797 configuration()->channel_storage_duration()),
798 scheduler_)))
799 .first;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700800 }
801 return it->second.get();
802}
803
Brian Silverman4f4e0612020-08-12 19:54:41 -0700804int SimulatedEventLoop::NumberBuffers(const Channel *channel) {
805 return GetSimulatedChannel(channel)->number_buffers();
806}
807
Austin Schuh7d87b672019-12-01 20:23:49 -0800808SimulatedWatcher::SimulatedWatcher(
809 SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
Austin Schuh8bd96322020-02-13 21:18:22 -0800810 const Channel *channel,
Austin Schuh7d87b672019-12-01 20:23:49 -0800811 std::function<void(const Context &context, const void *message)> fn)
812 : WatcherState(simulated_event_loop, channel, std::move(fn)),
813 simulated_event_loop_(simulated_event_loop),
Brian Silverman4f4e0612020-08-12 19:54:41 -0700814 channel_(channel),
Austin Schuh7d87b672019-12-01 20:23:49 -0800815 scheduler_(scheduler),
Brian Silverman4f4e0612020-08-12 19:54:41 -0700816 event_(this),
Austin Schuh58646e22021-08-23 23:51:46 -0700817 token_(scheduler_->InvalidToken()) {
818 VLOG(1) << simulated_event_loop_->distributed_now() << " "
819 << NodeName(simulated_event_loop_->node())
820 << simulated_event_loop_->monotonic_now() << " "
821 << simulated_event_loop_->name() << " Watching "
822 << configuration::StrippedChannelToString(channel_);
823}
Austin Schuh7d87b672019-12-01 20:23:49 -0800824
825SimulatedWatcher::~SimulatedWatcher() {
Austin Schuh58646e22021-08-23 23:51:46 -0700826 VLOG(1) << simulated_event_loop_->distributed_now() << " "
Austin Schuh057d29f2021-08-21 23:05:15 -0700827 << NodeName(simulated_event_loop_->node())
Austin Schuh58646e22021-08-23 23:51:46 -0700828 << simulated_event_loop_->monotonic_now() << " "
829 << simulated_event_loop_->name() << " ~Watching "
Austin Schuh057d29f2021-08-21 23:05:15 -0700830 << configuration::StrippedChannelToString(channel_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800831 simulated_event_loop_->RemoveEvent(&event_);
832 if (token_ != scheduler_->InvalidToken()) {
833 scheduler_->Deschedule(token_);
834 }
Brian Silverman4f4e0612020-08-12 19:54:41 -0700835 CHECK_NOTNULL(simulated_channel_)->RemoveWatcher(this);
Austin Schuh7d87b672019-12-01 20:23:49 -0800836}
837
Austin Schuh8fb315a2020-11-19 22:33:58 -0800838bool SimulatedWatcher::has_run() const {
839 return simulated_event_loop_->has_run();
840}
841
Austin Schuh7d87b672019-12-01 20:23:49 -0800842void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
Austin Schuha5e14192020-01-06 18:02:41 -0800843 monotonic_clock::time_point event_time =
844 simulated_event_loop_->monotonic_now();
Austin Schuh7d87b672019-12-01 20:23:49 -0800845
846 // Messages are queued in order. If we are the first, add ourselves.
847 // Otherwise, don't.
848 if (msgs_.size() == 0) {
Austin Schuhad154822019-12-27 15:45:13 -0800849 event_.set_event_time(message->context.monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800850 simulated_event_loop_->AddEvent(&event_);
851
852 DoSchedule(event_time);
853 }
854
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800855 msgs_.emplace_back(std::move(message));
Austin Schuh7d87b672019-12-01 20:23:49 -0800856}
857
Austin Schuhf4b09c72021-12-08 12:04:37 -0800858void SimulatedWatcher::HandleEvent() noexcept {
Austin Schuh7d87b672019-12-01 20:23:49 -0800859 const monotonic_clock::time_point monotonic_now =
860 simulated_event_loop_->monotonic_now();
Austin Schuh58646e22021-08-23 23:51:46 -0700861 VLOG(1) << simulated_event_loop_->distributed_now() << " "
862 << NodeName(simulated_event_loop_->node())
863 << simulated_event_loop_->monotonic_now() << " "
864 << simulated_event_loop_->name() << " Watcher "
Austin Schuh057d29f2021-08-21 23:05:15 -0700865 << configuration::StrippedChannelToString(channel_);
866 CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
867
Tyler Chatow67ddb032020-01-12 14:30:04 -0800868 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -0700869 if (simulated_event_loop_->log_impl_) {
870 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -0800871 }
Austin Schuhad154822019-12-27 15:45:13 -0800872 Context context = msgs_.front()->context;
873
Brian Silverman4f4e0612020-08-12 19:54:41 -0700874 if (channel_->read_method() != ReadMethod::PIN) {
875 context.buffer_index = -1;
876 }
Austin Schuhad154822019-12-27 15:45:13 -0800877 if (context.remote_queue_index == 0xffffffffu) {
878 context.remote_queue_index = context.queue_index;
879 }
Austin Schuh58646e22021-08-23 23:51:46 -0700880 if (context.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800881 context.monotonic_remote_time = context.monotonic_event_time;
882 }
Austin Schuh58646e22021-08-23 23:51:46 -0700883 if (context.realtime_remote_time == realtime_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800884 context.realtime_remote_time = context.realtime_event_time;
885 }
886
Austin Schuhcc6070c2020-10-10 20:25:56 -0700887 {
888 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
889 DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
890 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800891
892 msgs_.pop_front();
Austin Schuheb4e4ce2020-09-10 23:04:18 -0700893 if (token_ != scheduler_->InvalidToken()) {
894 scheduler_->Deschedule(token_);
895 token_ = scheduler_->InvalidToken();
896 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800897 if (msgs_.size() != 0) {
Austin Schuhad154822019-12-27 15:45:13 -0800898 event_.set_event_time(msgs_.front()->context.monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800899 simulated_event_loop_->AddEvent(&event_);
900
901 DoSchedule(event_.event_time());
Austin Schuh7d87b672019-12-01 20:23:49 -0800902 }
903}
904
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800905void SimulatedWatcher::Handle() noexcept {
906 DCHECK(token_ != scheduler_->InvalidToken());
907 token_ = scheduler_->InvalidToken();
908 simulated_event_loop_->HandleEvent();
909}
910
Austin Schuh7d87b672019-12-01 20:23:49 -0800911void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
Austin Schuheb4e4ce2020-09-10 23:04:18 -0700912 CHECK(token_ == scheduler_->InvalidToken())
913 << ": May not schedule multiple times";
914 token_ = scheduler_->Schedule(
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800915 event_time + simulated_event_loop_->send_delay(), this);
Austin Schuh7d87b672019-12-01 20:23:49 -0800916}
917
918void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
Brian Silverman77162972020-08-12 19:52:40 -0700919 CheckReaderCount();
Austin Schuh39788ff2019-12-01 18:22:57 -0800920 watcher->SetSimulatedChannel(this);
921 watchers_.emplace_back(watcher);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700922}
923
924::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
Austin Schuh8fb315a2020-11-19 22:33:58 -0800925 SimulatedEventLoop *event_loop) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700926 return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
927}
928
Austin Schuh39788ff2019-12-01 18:22:57 -0800929::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
930 EventLoop *event_loop) {
Brian Silverman77162972020-08-12 19:52:40 -0700931 CheckReaderCount();
Austin Schuh39788ff2019-12-01 18:22:57 -0800932 ::std::unique_ptr<SimulatedFetcher> fetcher(
933 new SimulatedFetcher(event_loop, this));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700934 fetchers_.push_back(fetcher.get());
935 return ::std::move(fetcher);
936}
937
milind1f1dca32021-07-03 13:50:07 -0700938std::optional<uint32_t> SimulatedChannel::Send(
939 std::shared_ptr<SimulatedMessage> message) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700940 const auto now = scheduler_->monotonic_now();
941 // Remove times that are greater than or equal to a channel_storage_duration_
942 // ago
943 while (!last_times_.empty() &&
944 (now - last_times_.front() >= channel_storage_duration_)) {
945 last_times_.pop();
946 }
947
948 // Check that we are not sending messages too fast
949 if (static_cast<int>(last_times_.size()) >= queue_size()) {
950 return std::nullopt;
951 }
952
953 const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
954 last_times_.push(now);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700955
milind1f1dca32021-07-03 13:50:07 -0700956 message->context.queue_index = *queue_index;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700957 // Points to the actual data depending on the size set in context. Data may
958 // allocate more than the actual size of the message, so offset from the back
959 // of that to get the actual start of the data.
960 message->context.data =
961 message->data->data() + message->data->size() - message->context.size;
Austin Schuha9df9ad2021-06-16 14:49:39 -0700962
963 DCHECK(channel()->has_schema())
964 << ": Missing schema for channel "
965 << configuration::StrippedChannelToString(channel());
966 DCHECK(flatbuffers::Verify(
967 *channel()->schema(), *channel()->schema()->root_table(),
968 static_cast<const uint8_t *>(message->context.data),
969 message->context.size))
970 << ": Corrupted flatbuffer on " << channel()->name()->c_str() << " "
971 << channel()->type()->c_str();
972
Alex Perrycb7da4b2019-08-28 19:35:56 -0700973 next_queue_index_ = next_queue_index_.Increment();
974
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800975 latest_message_ = std::move(message);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800976 for (SimulatedWatcher *watcher : watchers_) {
977 if (watcher->has_run()) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800978 watcher->Schedule(latest_message_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700979 }
980 }
981 for (auto &fetcher : fetchers_) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800982 fetcher->Enqueue(latest_message_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700983 }
Austin Schuhad154822019-12-27 15:45:13 -0800984 return queue_index;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700985}
986
987void SimulatedChannel::UnregisterFetcher(SimulatedFetcher *fetcher) {
988 fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
989}
990
Austin Schuh8fb315a2020-11-19 22:33:58 -0800991SimulatedSender::SimulatedSender(SimulatedChannel *simulated_channel,
992 SimulatedEventLoop *event_loop)
993 : RawSender(event_loop, simulated_channel->channel()),
994 simulated_channel_(simulated_channel),
Austin Schuh58646e22021-08-23 23:51:46 -0700995 simulated_event_loop_(event_loop) {
Austin Schuh8fb315a2020-11-19 22:33:58 -0800996 simulated_channel_->CountSenderCreated();
997}
998
999SimulatedSender::~SimulatedSender() {
1000 simulated_channel_->CountSenderDestroyed();
1001}
1002
milind1f1dca32021-07-03 13:50:07 -07001003RawSender::Error SimulatedSender::DoSend(
1004 size_t length, monotonic_clock::time_point monotonic_remote_time,
1005 realtime_clock::time_point realtime_remote_time,
1006 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Austin Schuh58646e22021-08-23 23:51:46 -07001007 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1008 << NodeName(simulated_event_loop_->node())
1009 << simulated_event_loop_->monotonic_now() << " "
1010 << simulated_event_loop_->name() << " Send "
1011 << configuration::StrippedChannelToString(channel());
1012
Austin Schuh8fb315a2020-11-19 22:33:58 -08001013 // The allocations in here are due to infrastructure and don't count in the
1014 // no mallocs in RT code.
1015 ScopedNotRealtime nrt;
1016 CHECK_LE(length, size()) << ": Attempting to send too big a message.";
Austin Schuh58646e22021-08-23 23:51:46 -07001017 message_->context.monotonic_event_time =
1018 simulated_event_loop_->monotonic_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001019 message_->context.monotonic_remote_time = monotonic_remote_time;
1020 message_->context.remote_queue_index = remote_queue_index;
Austin Schuh58646e22021-08-23 23:51:46 -07001021 message_->context.realtime_event_time = simulated_event_loop_->realtime_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001022 message_->context.realtime_remote_time = realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -07001023 message_->context.source_boot_uuid = source_boot_uuid;
Austin Schuh8fb315a2020-11-19 22:33:58 -08001024 CHECK_LE(length, message_->context.size);
1025 message_->context.size = length;
1026
milind1f1dca32021-07-03 13:50:07 -07001027 const std::optional<uint32_t> optional_queue_index =
1028 simulated_channel_->Send(message_);
1029
1030 // Check that we are not sending messages too fast
1031 if (!optional_queue_index) {
1032 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1033 << NodeName(simulated_event_loop_->node())
1034 << simulated_event_loop_->monotonic_now() << " "
1035 << simulated_event_loop_->name()
1036 << "\nMessages were sent too fast:\n"
1037 << "For channel: "
1038 << configuration::CleanedChannelToString(
1039 simulated_channel_->channel())
1040 << '\n'
1041 << "Tried to send more than " << simulated_channel_->queue_size()
1042 << " (queue size) messages in the last "
1043 << std::chrono::duration<double>(
1044 simulated_channel_->channel_storage_duration())
1045 .count()
1046 << " seconds (channel storage duration)"
1047 << "\n\n";
1048 return Error::kMessagesSentTooFast;
1049 }
1050
1051 sent_queue_index_ = *optional_queue_index;
Austin Schuh58646e22021-08-23 23:51:46 -07001052 monotonic_sent_time_ = simulated_event_loop_->monotonic_now();
1053 realtime_sent_time_ = simulated_event_loop_->realtime_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001054
1055 // Drop the reference to the message so that we allocate a new message for
1056 // next time. Otherwise we will continue to reuse the same memory for all
1057 // messages and corrupt it.
1058 message_.reset();
milind1f1dca32021-07-03 13:50:07 -07001059 return Error::kOk;
Austin Schuh8fb315a2020-11-19 22:33:58 -08001060}
1061
milind1f1dca32021-07-03 13:50:07 -07001062RawSender::Error SimulatedSender::DoSend(
1063 const void *msg, size_t size,
1064 monotonic_clock::time_point monotonic_remote_time,
1065 realtime_clock::time_point realtime_remote_time,
1066 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Austin Schuh102667e2020-12-11 20:13:28 -08001067 CHECK_LE(size, this->size())
1068 << ": Attempting to send too big a message on "
1069 << configuration::CleanedChannelToString(simulated_channel_->channel());
Austin Schuh8fb315a2020-11-19 22:33:58 -08001070
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001071 // Allocates an aligned buffer in which to copy unaligned msg.
1072 auto [span, mutable_span] = MakeSharedSpan(size);
1073 message_ = SimulatedMessage::Make(simulated_channel_, span);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001074
1075 // Now fill in the message. size is already populated above, and
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001076 // queue_index will be populated in simulated_channel_.
1077 memcpy(mutable_span.data(), msg, size);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001078
1079 return DoSend(size, monotonic_remote_time, realtime_remote_time,
Austin Schuha9012be2021-07-21 15:19:11 -07001080 remote_queue_index, source_boot_uuid);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001081}
1082
milind1f1dca32021-07-03 13:50:07 -07001083RawSender::Error SimulatedSender::DoSend(
1084 const RawSender::SharedSpan data,
1085 monotonic_clock::time_point monotonic_remote_time,
1086 realtime_clock::time_point realtime_remote_time,
1087 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001088 CHECK_LE(data->size(), this->size())
1089 << ": Attempting to send too big a message on "
1090 << configuration::CleanedChannelToString(simulated_channel_->channel());
1091
1092 // Constructs a message sharing the already allocated and aligned message
1093 // data.
1094 message_ = SimulatedMessage::Make(simulated_channel_, data);
1095
1096 return DoSend(data->size(), monotonic_remote_time, realtime_remote_time,
1097 remote_queue_index, source_boot_uuid);
1098}
1099
Austin Schuh39788ff2019-12-01 18:22:57 -08001100SimulatedTimerHandler::SimulatedTimerHandler(
Austin Schuh8bd96322020-02-13 21:18:22 -08001101 EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
1102 ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -08001103 : TimerHandler(simulated_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -08001104 simulated_event_loop_(simulated_event_loop),
1105 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -08001106 scheduler_(scheduler),
1107 token_(scheduler_->InvalidToken()) {}
1108
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001109void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
1110 monotonic_clock::duration repeat_offset) {
Austin Schuh62288252020-11-18 23:26:04 -08001111 // The allocations in here are due to infrastructure and don't count in the no
1112 // mallocs in RT code.
1113 ScopedNotRealtime nrt;
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001114 Disable();
Austin Schuh58646e22021-08-23 23:51:46 -07001115 const monotonic_clock::time_point monotonic_now =
Austin Schuha5e14192020-01-06 18:02:41 -08001116 simulated_event_loop_->monotonic_now();
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001117 base_ = base;
1118 repeat_offset_ = repeat_offset;
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001119 token_ = scheduler_->Schedule(std::max(base, monotonic_now), this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001120 event_.set_event_time(base_);
1121 simulated_event_loop_->AddEvent(&event_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001122}
1123
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001124void SimulatedTimerHandler::Handle() noexcept {
1125 DCHECK(token_ != scheduler_->InvalidToken());
1126 token_ = scheduler_->InvalidToken();
1127 simulated_event_loop_->HandleEvent();
1128}
1129
Austin Schuhf4b09c72021-12-08 12:04:37 -08001130void SimulatedTimerHandler::HandleEvent() noexcept {
Austin Schuh58646e22021-08-23 23:51:46 -07001131 const monotonic_clock::time_point monotonic_now =
Austin Schuha5e14192020-01-06 18:02:41 -08001132 simulated_event_loop_->monotonic_now();
Austin Schuh58646e22021-08-23 23:51:46 -07001133 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1134 << NodeName(simulated_event_loop_->node()) << monotonic_now << " "
1135 << simulated_event_loop_->name() << " Timer '" << name() << "'";
Tyler Chatow67ddb032020-01-12 14:30:04 -08001136 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -07001137 if (simulated_event_loop_->log_impl_) {
1138 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -08001139 }
Austin Schuheb4e4ce2020-09-10 23:04:18 -07001140 if (token_ != scheduler_->InvalidToken()) {
1141 scheduler_->Deschedule(token_);
1142 token_ = scheduler_->InvalidToken();
1143 }
Austin Schuh58646e22021-08-23 23:51:46 -07001144 if (repeat_offset_ != monotonic_clock::zero()) {
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001145 // Reschedule.
1146 while (base_ <= monotonic_now) base_ += repeat_offset_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001147 token_ = scheduler_->Schedule(base_, this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001148 event_.set_event_time(base_);
1149 simulated_event_loop_->AddEvent(&event_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001150 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001151
Austin Schuhcc6070c2020-10-10 20:25:56 -07001152 {
1153 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
1154 Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
1155 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001156}
1157
Austin Schuh7d87b672019-12-01 20:23:49 -08001158void SimulatedTimerHandler::Disable() {
1159 simulated_event_loop_->RemoveEvent(&event_);
1160 if (token_ != scheduler_->InvalidToken()) {
1161 scheduler_->Deschedule(token_);
1162 token_ = scheduler_->InvalidToken();
1163 }
1164}
1165
Austin Schuh39788ff2019-12-01 18:22:57 -08001166SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
Austin Schuh8bd96322020-02-13 21:18:22 -08001167 EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
1168 ::std::function<void(int)> fn, const monotonic_clock::duration interval,
Austin Schuh39788ff2019-12-01 18:22:57 -08001169 const monotonic_clock::duration offset)
1170 : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
1171 simulated_event_loop_(simulated_event_loop),
Austin Schuh7d87b672019-12-01 20:23:49 -08001172 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -08001173 scheduler_(scheduler),
1174 token_(scheduler_->InvalidToken()) {}
1175
Austin Schuh7d87b672019-12-01 20:23:49 -08001176SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
1177 if (token_ != scheduler_->InvalidToken()) {
1178 scheduler_->Deschedule(token_);
1179 token_ = scheduler_->InvalidToken();
1180 }
1181 simulated_event_loop_->RemoveEvent(&event_);
1182}
1183
Austin Schuhf4b09c72021-12-08 12:04:37 -08001184void SimulatedPhasedLoopHandler::HandleEvent() noexcept {
Austin Schuh39788ff2019-12-01 18:22:57 -08001185 monotonic_clock::time_point monotonic_now =
1186 simulated_event_loop_->monotonic_now();
Austin Schuh057d29f2021-08-21 23:05:15 -07001187 VLOG(1) << monotonic_now << " Phased loop " << simulated_event_loop_->name()
1188 << ", " << name();
Tyler Chatow67ddb032020-01-12 14:30:04 -08001189 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -07001190 if (simulated_event_loop_->log_impl_) {
1191 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -08001192 }
Austin Schuhcc6070c2020-10-10 20:25:56 -07001193
1194 {
1195 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
1196 Call([monotonic_now]() { return monotonic_now; },
1197 [this](monotonic_clock::time_point sleep_time) {
1198 Schedule(sleep_time);
1199 });
1200 }
Austin Schuh39788ff2019-12-01 18:22:57 -08001201}
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001202
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001203void SimulatedPhasedLoopHandler::Handle() noexcept {
1204 DCHECK(token_ != scheduler_->InvalidToken());
1205 token_ = scheduler_->InvalidToken();
1206 simulated_event_loop_->HandleEvent();
1207}
1208
Austin Schuh7d87b672019-12-01 20:23:49 -08001209void SimulatedPhasedLoopHandler::Schedule(
1210 monotonic_clock::time_point sleep_time) {
Austin Schuh62288252020-11-18 23:26:04 -08001211 // The allocations in here are due to infrastructure and don't count in the no
1212 // mallocs in RT code.
1213 ScopedNotRealtime nrt;
Austin Schuheb4e4ce2020-09-10 23:04:18 -07001214 if (token_ != scheduler_->InvalidToken()) {
1215 scheduler_->Deschedule(token_);
1216 token_ = scheduler_->InvalidToken();
1217 }
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001218 token_ = scheduler_->Schedule(sleep_time, this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001219 event_.set_event_time(sleep_time);
1220 simulated_event_loop_->AddEvent(&event_);
1221}
1222
Alex Perrycb7da4b2019-08-28 19:35:56 -07001223SimulatedEventLoopFactory::SimulatedEventLoopFactory(
1224 const Configuration *configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -08001225 : configuration_(CHECK_NOTNULL(configuration)),
1226 nodes_(configuration::GetNodes(configuration_)) {
Austin Schuh094d09b2020-11-20 23:26:52 -08001227 CHECK(IsInitialized()) << ": Need to initialize AOS first.";
Austin Schuhac0771c2020-01-07 18:36:30 -08001228 for (const Node *node : nodes_) {
Austin Schuh58646e22021-08-23 23:51:46 -07001229 node_factories_.emplace_back(
1230 new NodeEventLoopFactory(&scheduler_scheduler_, this, node));
Austin Schuh15649d62019-12-28 16:36:38 -08001231 }
Austin Schuh898f4972020-01-11 17:21:25 -08001232
1233 if (configuration::MultiNode(configuration)) {
1234 bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
1235 }
Austin Schuh15649d62019-12-28 16:36:38 -08001236}
1237
Alex Perrycb7da4b2019-08-28 19:35:56 -07001238SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
1239
Austin Schuhac0771c2020-01-07 18:36:30 -08001240NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
Austin Schuh057d29f2021-08-21 23:05:15 -07001241 std::string_view node) {
1242 return GetNodeEventLoopFactory(configuration::GetNode(configuration(), node));
1243}
1244
1245NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
Austin Schuhac0771c2020-01-07 18:36:30 -08001246 const Node *node) {
1247 auto result = std::find_if(
1248 node_factories_.begin(), node_factories_.end(),
1249 [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
1250 return node_factory->node() == node;
1251 });
1252
1253 CHECK(result != node_factories_.end())
1254 << ": Failed to find node " << FlatbufferToJson(node);
1255
1256 return result->get();
1257}
1258
Austin Schuh87dd3832021-01-01 23:07:31 -08001259void SimulatedEventLoopFactory::SetTimeConverter(
1260 TimeConverter *time_converter) {
1261 for (std::unique_ptr<NodeEventLoopFactory> &factory : node_factories_) {
1262 factory->SetTimeConverter(time_converter);
1263 }
Austin Schuh58646e22021-08-23 23:51:46 -07001264 scheduler_scheduler_.SetTimeConverter(time_converter);
Austin Schuh87dd3832021-01-01 23:07:31 -08001265}
1266
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08001267::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
Austin Schuhac0771c2020-01-07 18:36:30 -08001268 std::string_view name, const Node *node) {
1269 if (node == nullptr) {
1270 CHECK(!configuration::MultiNode(configuration()))
1271 << ": Can't make a single node event loop in a multi-node world.";
1272 } else {
1273 CHECK(configuration::MultiNode(configuration()))
1274 << ": Can't make a multi-node event loop in a single-node world.";
1275 }
1276 return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
1277}
1278
Austin Schuh057d29f2021-08-21 23:05:15 -07001279NodeEventLoopFactory::NodeEventLoopFactory(
1280 EventSchedulerScheduler *scheduler_scheduler,
1281 SimulatedEventLoopFactory *factory, const Node *node)
Austin Schuh58646e22021-08-23 23:51:46 -07001282 : scheduler_(configuration::GetNodeIndex(factory->configuration(), node)),
1283 factory_(factory),
1284 node_(node) {
Austin Schuh057d29f2021-08-21 23:05:15 -07001285 scheduler_scheduler->AddEventScheduler(&scheduler_);
Austin Schuh58646e22021-08-23 23:51:46 -07001286 scheduler_.set_started([this]() {
1287 started_ = true;
1288 for (SimulatedEventLoop *event_loop : event_loops_) {
1289 event_loop->SetIsRunning(true);
1290 }
1291 });
Austin Schuhe33c08d2022-02-03 18:15:21 -08001292 scheduler_.set_stopped([this]() {
1293 for (SimulatedEventLoop *event_loop : event_loops_) {
1294 event_loop->SetIsRunning(false);
1295 }
1296 });
Austin Schuh58646e22021-08-23 23:51:46 -07001297 scheduler_.set_on_shutdown([this]() {
1298 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
1299 << monotonic_now() << " Shutting down node.";
1300 Shutdown();
1301 ScheduleStartup();
1302 });
1303 ScheduleStartup();
Austin Schuh057d29f2021-08-21 23:05:15 -07001304}
1305
1306NodeEventLoopFactory::~NodeEventLoopFactory() {
Austin Schuh58646e22021-08-23 23:51:46 -07001307 if (started_) {
1308 for (std::function<void()> &fn : on_shutdown_) {
1309 fn();
1310 }
1311
1312 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1313 << monotonic_now() << " Shutting down applications.";
1314 applications_.clear();
1315 started_ = false;
1316 }
1317
1318 if (event_loops_.size() != 0u) {
1319 for (SimulatedEventLoop *event_loop : event_loops_) {
1320 LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
1321 << monotonic_now() << " Event loop '" << event_loop->name()
1322 << "' failed to shut down";
1323 }
1324 }
Austin Schuh057d29f2021-08-21 23:05:15 -07001325 CHECK_EQ(event_loops_.size(), 0u) << "Event loop didn't exit";
1326}
1327
Austin Schuh58646e22021-08-23 23:51:46 -07001328void NodeEventLoopFactory::OnStartup(std::function<void()> &&fn) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001329 CHECK(!scheduler_.is_running())
Austin Schuh58646e22021-08-23 23:51:46 -07001330 << ": Can only register OnStartup handlers when not running.";
1331 on_startup_.emplace_back(std::move(fn));
1332 if (started_) {
1333 size_t on_startup_index = on_startup_.size() - 1;
1334 scheduler_.ScheduleOnStartup(
1335 [this, on_startup_index]() { on_startup_[on_startup_index](); });
1336 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001337}
1338
Austin Schuh58646e22021-08-23 23:51:46 -07001339void NodeEventLoopFactory::OnShutdown(std::function<void()> &&fn) {
1340 on_shutdown_.emplace_back(std::move(fn));
Austin Schuhc0b0f722020-12-12 18:36:06 -08001341}
Austin Schuh057d29f2021-08-21 23:05:15 -07001342
Austin Schuh58646e22021-08-23 23:51:46 -07001343void NodeEventLoopFactory::ScheduleStartup() {
1344 scheduler_.ScheduleOnStartup([this]() {
1345 UUID next_uuid = scheduler_.boot_uuid();
1346 if (boot_uuid_ != next_uuid) {
Austin Schuh188a2f62021-11-08 10:45:54 -08001347 CHECK_EQ(boot_uuid_, UUID::Zero())
1348 << ": Boot UUID changed without restarting. Did TimeConverter "
1349 "change the boot UUID without signaling a restart, or did you "
1350 "change TimeConverter?";
Austin Schuh58646e22021-08-23 23:51:46 -07001351 boot_uuid_ = next_uuid;
1352 }
1353 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
1354 << monotonic_now() << " Starting up node on boot " << boot_uuid_;
1355 Startup();
1356 });
1357}
1358
1359void NodeEventLoopFactory::Startup() {
1360 CHECK(!started_);
1361 for (size_t i = 0; i < on_startup_.size(); ++i) {
1362 on_startup_[i]();
1363 }
1364}
1365
1366void NodeEventLoopFactory::Shutdown() {
1367 for (SimulatedEventLoop *event_loop : event_loops_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08001368 CHECK(!event_loop->is_running());
Austin Schuh58646e22021-08-23 23:51:46 -07001369 }
1370
1371 CHECK(started_);
1372 started_ = false;
1373 for (std::function<void()> &fn : on_shutdown_) {
1374 fn();
1375 }
1376
1377 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1378 << monotonic_now() << " Shutting down applications.";
1379 applications_.clear();
1380
1381 if (event_loops_.size() != 0u) {
1382 for (SimulatedEventLoop *event_loop : event_loops_) {
1383 LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
1384 << monotonic_now() << " Event loop '" << event_loop->name()
1385 << "' failed to shut down";
1386 }
1387 }
1388 CHECK_EQ(event_loops_.size(), 0u) << "Not all event loops shut down";
1389 boot_uuid_ = UUID::Zero();
1390
1391 channels_.clear();
Austin Schuhc0b0f722020-12-12 18:36:06 -08001392}
1393
Alex Perrycb7da4b2019-08-28 19:35:56 -07001394void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
Austin Schuh58646e22021-08-23 23:51:46 -07001395 // This sets running to true too.
Austin Schuh8bd96322020-02-13 21:18:22 -08001396 scheduler_scheduler_.RunFor(duration);
Austin Schuh057d29f2021-08-21 23:05:15 -07001397 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1398 if (node) {
1399 for (SimulatedEventLoop *loop : node->event_loops_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08001400 CHECK(!loop->is_running());
Austin Schuh057d29f2021-08-21 23:05:15 -07001401 }
1402 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001403 }
1404}
1405
1406void SimulatedEventLoopFactory::Run() {
Austin Schuh58646e22021-08-23 23:51:46 -07001407 // This sets running to true too.
Austin Schuh8bd96322020-02-13 21:18:22 -08001408 scheduler_scheduler_.Run();
Austin Schuh057d29f2021-08-21 23:05:15 -07001409 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1410 if (node) {
1411 for (SimulatedEventLoop *loop : node->event_loops_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08001412 CHECK(!loop->is_running());
Austin Schuh057d29f2021-08-21 23:05:15 -07001413 }
1414 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001415 }
1416}
1417
Austin Schuh87dd3832021-01-01 23:07:31 -08001418void SimulatedEventLoopFactory::Exit() { scheduler_scheduler_.Exit(); }
Austin Schuh8fb315a2020-11-19 22:33:58 -08001419
Austin Schuh6f3babe2020-01-26 20:34:50 -08001420void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001421 CHECK(bridge_) << ": Can't disable forwarding without a message bridge.";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001422 bridge_->DisableForwarding(channel);
1423}
1424
Austin Schuh4c3b9702020-08-30 11:34:55 -07001425void SimulatedEventLoopFactory::DisableStatistics() {
1426 CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
1427 bridge_->DisableStatistics();
1428}
1429
Austin Schuh48205e62021-11-12 14:13:18 -08001430void SimulatedEventLoopFactory::EnableStatistics() {
1431 CHECK(bridge_) << ": Can't enable statistics without a message bridge.";
1432 bridge_->EnableStatistics();
1433}
1434
Austin Schuh2928ebe2021-02-07 22:10:27 -08001435void SimulatedEventLoopFactory::SkipTimingReport() {
1436 CHECK(bridge_) << ": Can't skip timing reports without a message bridge.";
Austin Schuh48205e62021-11-12 14:13:18 -08001437
1438 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1439 if (node) {
1440 node->SkipTimingReport();
1441 }
1442 }
1443}
1444
1445void NodeEventLoopFactory::SkipTimingReport() {
1446 for (SimulatedEventLoop *event_loop : event_loops_) {
1447 event_loop->SkipTimingReport();
1448 }
1449 skip_timing_report_ = true;
1450}
1451
1452void NodeEventLoopFactory::EnableStatistics() {
1453 CHECK(factory_->bridge_)
1454 << ": Can't enable statistics without a message bridge.";
1455 factory_->bridge_->EnableStatistics(node_);
1456}
1457
1458void NodeEventLoopFactory::DisableStatistics() {
1459 CHECK(factory_->bridge_)
1460 << ": Can't disable statistics without a message bridge.";
1461 factory_->bridge_->DisableStatistics(node_);
Austin Schuh2928ebe2021-02-07 22:10:27 -08001462}
1463
Austin Schuh58646e22021-08-23 23:51:46 -07001464::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
1465 std::string_view name) {
1466 CHECK(!scheduler_.is_running() || !started_)
1467 << ": Can't create an event loop while running";
1468
1469 pid_t tid = tid_;
1470 ++tid_;
1471 ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
1472 &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
1473 node_, tid));
1474 result->set_name(name);
1475 result->set_send_delay(factory_->send_delay());
Austin Schuh48205e62021-11-12 14:13:18 -08001476 if (skip_timing_report_) {
1477 result->SkipTimingReport();
1478 }
Austin Schuh58646e22021-08-23 23:51:46 -07001479
1480 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1481 << monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
1482 return std::move(result);
1483}
1484
Austin Schuhe33c08d2022-02-03 18:15:21 -08001485void SimulatedEventLoopFactory::AllowApplicationCreationDuring(
1486 std::function<void()> fn) {
1487 scheduler_scheduler_.TemporarilyStopAndRun(std::move(fn));
1488}
1489
Austin Schuh58646e22021-08-23 23:51:46 -07001490void NodeEventLoopFactory::Disconnect(const Node *other) {
1491 factory_->bridge_->Disconnect(node_, other);
1492}
1493
1494void NodeEventLoopFactory::Connect(const Node *other) {
1495 factory_->bridge_->Connect(node_, other);
1496}
1497
Alex Perrycb7da4b2019-08-28 19:35:56 -07001498} // namespace aos