blob: 4671f128f5b0f225b13754126aae9c6fd821cd00 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
3#include <algorithm>
4#include <deque>
milind1f1dca32021-07-03 13:50:07 -07005#include <optional>
6#include <queue>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08007#include <string_view>
Brian Silverman661eb8d2020-08-12 19:41:01 -07008#include <vector>
Alex Perrycb7da4b2019-08-28 19:35:56 -07009
10#include "absl/container/btree_map.h"
Brian Silverman661eb8d2020-08-12 19:41:01 -070011#include "aos/events/aos_logging.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/simulated_network_bridge.h"
Austin Schuh094d09b2020-11-20 23:26:52 -080013#include "aos/init.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070014#include "aos/json_to_flatbuffer.h"
Austin Schuhcc6070c2020-10-10 20:25:56 -070015#include "aos/realtime.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070016#include "aos/util/phased_loop.h"
17
18namespace aos {
19
Brian Silverman661eb8d2020-08-12 19:41:01 -070020class SimulatedEventLoop;
21class SimulatedFetcher;
22class SimulatedChannel;
23
James Kuszmaul890c2492022-04-06 14:59:31 -070024using CheckSentTooFast = NodeEventLoopFactory::CheckSentTooFast;
25using ExclusiveSenders = NodeEventLoopFactory::ExclusiveSenders;
26using EventLoopOptions = NodeEventLoopFactory::EventLoopOptions;
27
Brian Silverman661eb8d2020-08-12 19:41:01 -070028namespace {
29
Austin Schuh057d29f2021-08-21 23:05:15 -070030std::string NodeName(const Node *node) {
31 if (node == nullptr) {
32 return "";
33 }
34
35 return absl::StrCat(node->name()->string_view(), " ");
36}
37
Austin Schuhcc6070c2020-10-10 20:25:56 -070038class ScopedMarkRealtimeRestorer {
39 public:
40 ScopedMarkRealtimeRestorer(bool rt) : rt_(rt), prior_(MarkRealtime(rt)) {}
41 ~ScopedMarkRealtimeRestorer() { CHECK_EQ(rt_, MarkRealtime(prior_)); }
42
43 private:
44 const bool rt_;
45 const bool prior_;
46};
47
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070048// Holds storage for a span object and the data referenced by that span for
49// compatibility with RawSender::SharedSpan users. If constructed with
50// MakeSharedSpan, span points to only the aligned segment of the entire data.
51struct AlignedOwningSpan {
52 AlignedOwningSpan(const AlignedOwningSpan &) = delete;
53 AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
54 absl::Span<const uint8_t> span;
55 char data[];
56};
57
58// Constructs a span which owns its data through a shared_ptr. The owning span
59// points to a const view of the data; also returns a temporary mutable span
60// which is only valid while the const shared span is kept alive.
61std::pair<RawSender::SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(
62 size_t size) {
63 AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
64 malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
65
66 absl::Span mutable_span(
67 reinterpret_cast<uint8_t *>(RoundChannelData(&span->data[0], size)),
68 size);
69 new (span) AlignedOwningSpan{.span = mutable_span};
70
71 return std::make_pair(
72 RawSender::SharedSpan(
73 std::shared_ptr<AlignedOwningSpan>(span,
74 [](AlignedOwningSpan *s) {
75 s->~AlignedOwningSpan();
76 free(s);
77 }),
78 &span->span),
79 mutable_span);
80}
81
Alex Perrycb7da4b2019-08-28 19:35:56 -070082// Container for both a message, and the context for it for simulation. This
83// makes tracking the timestamps associated with the data easy.
Brian Silverman661eb8d2020-08-12 19:41:01 -070084struct SimulatedMessage final {
85 SimulatedMessage(const SimulatedMessage &) = delete;
86 SimulatedMessage &operator=(const SimulatedMessage &) = delete;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070087 ~SimulatedMessage();
Brian Silverman661eb8d2020-08-12 19:41:01 -070088
89 // Creates a SimulatedMessage with size bytes of storage.
90 // This is a shared_ptr so we don't have to implement refcounting or copying.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070091 static std::shared_ptr<SimulatedMessage> Make(
92 SimulatedChannel *channel, const RawSender::SharedSpan data);
Brian Silverman661eb8d2020-08-12 19:41:01 -070093
Alex Perrycb7da4b2019-08-28 19:35:56 -070094 // Context for the data.
95 Context context;
96
Brian Silverman661eb8d2020-08-12 19:41:01 -070097 SimulatedChannel *const channel = nullptr;
Brian Silverman661eb8d2020-08-12 19:41:01 -070098
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070099 // Owning span to this message's data. Depending on the sender may either
100 // represent the data of just the flatbuffer, or max channel size.
101 RawSender::SharedSpan data;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700102
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700103 // Mutable view of above data. If empty, this message is not mutable.
104 absl::Span<uint8_t> mutable_data;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700105
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700106 // Determines whether this message is mutable. Used for Send where the user
107 // fills out a message stored internally then gives us the size of data used.
108 bool is_mutable() const { return data->size() == mutable_data.size(); }
109
110 // Note: this should be private but make_shared requires it to be public. Use
111 // Make() above to construct.
Brian Silverman661eb8d2020-08-12 19:41:01 -0700112 SimulatedMessage(SimulatedChannel *channel_in);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700113};
114
Brian Silverman661eb8d2020-08-12 19:41:01 -0700115} // namespace
Austin Schuh39788ff2019-12-01 18:22:57 -0800116
Brian Silverman661eb8d2020-08-12 19:41:01 -0700117// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
118// for some reason...
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800119class SimulatedWatcher : public WatcherState, public EventScheduler::Event {
Austin Schuh39788ff2019-12-01 18:22:57 -0800120 public:
Austin Schuh7d87b672019-12-01 20:23:49 -0800121 SimulatedWatcher(
122 SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
123 const Channel *channel,
124 std::function<void(const Context &context, const void *message)> fn);
Austin Schuh39788ff2019-12-01 18:22:57 -0800125
Austin Schuh7d87b672019-12-01 20:23:49 -0800126 ~SimulatedWatcher() override;
Austin Schuh39788ff2019-12-01 18:22:57 -0800127
Austin Schuh8fb315a2020-11-19 22:33:58 -0800128 bool has_run() const;
129
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800130 void Handle() noexcept override;
131
Austin Schuh39788ff2019-12-01 18:22:57 -0800132 void Startup(EventLoop * /*event_loop*/) override {}
133
Austin Schuh7d87b672019-12-01 20:23:49 -0800134 void Schedule(std::shared_ptr<SimulatedMessage> message);
135
Austin Schuhf4b09c72021-12-08 12:04:37 -0800136 void HandleEvent() noexcept;
Austin Schuh39788ff2019-12-01 18:22:57 -0800137
138 void SetSimulatedChannel(SimulatedChannel *channel) {
139 simulated_channel_ = channel;
140 }
141
142 private:
Austin Schuh7d87b672019-12-01 20:23:49 -0800143 void DoSchedule(monotonic_clock::time_point event_time);
144
145 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
146
Brian Silverman4f4e0612020-08-12 19:54:41 -0700147 SimulatedEventLoop *const simulated_event_loop_;
148 const Channel *const channel_;
149 EventScheduler *const scheduler_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800150 EventHandler<SimulatedWatcher> event_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800151 EventScheduler::Token token_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800152 SimulatedChannel *simulated_channel_ = nullptr;
153};
Alex Perrycb7da4b2019-08-28 19:35:56 -0700154
155class SimulatedChannel {
156 public:
Austin Schuh8fb315a2020-11-19 22:33:58 -0800157 explicit SimulatedChannel(const Channel *channel,
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700158 std::chrono::nanoseconds channel_storage_duration,
159 const EventScheduler *scheduler)
Austin Schuh39788ff2019-12-01 18:22:57 -0800160 : channel_(channel),
Brian Silverman661eb8d2020-08-12 19:41:01 -0700161 channel_storage_duration_(channel_storage_duration),
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700162 next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())),
163 scheduler_(scheduler) {
Brian Silvermanbc596c62021-10-15 14:04:54 -0700164 available_buffer_indices_.resize(number_buffers());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700165 for (int i = 0; i < number_buffers(); ++i) {
Brian Silvermanbc596c62021-10-15 14:04:54 -0700166 available_buffer_indices_[i] = i;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700167 }
168 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700169
Brian Silverman661eb8d2020-08-12 19:41:01 -0700170 ~SimulatedChannel() {
171 latest_message_.reset();
172 CHECK_EQ(static_cast<size_t>(number_buffers()),
173 available_buffer_indices_.size());
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800174 CHECK_EQ(0u, fetchers_.size())
175 << configuration::StrippedChannelToString(channel());
176 CHECK_EQ(0u, watchers_.size())
177 << configuration::StrippedChannelToString(channel());
178 CHECK_EQ(0, sender_count_)
179 << configuration::StrippedChannelToString(channel());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700180 }
181
182 // The number of messages we pretend to have in the queue.
183 int queue_size() const {
184 return channel()->frequency() *
185 std::chrono::duration_cast<std::chrono::duration<double>>(
186 channel_storage_duration_)
187 .count();
188 }
189
milind1f1dca32021-07-03 13:50:07 -0700190 std::chrono::nanoseconds channel_storage_duration() const {
191 return channel_storage_duration_;
192 }
193
Brian Silverman661eb8d2020-08-12 19:41:01 -0700194 // The number of extra buffers (beyond the queue) we pretend to have.
195 int number_scratch_buffers() const {
196 // We need to start creating messages before we know how many
197 // senders+readers we'll have, so we need to just pick something which is
198 // always big enough.
199 return 50;
200 }
201
202 int number_buffers() const { return queue_size() + number_scratch_buffers(); }
203
204 int GetBufferIndex() {
205 CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
206 const int result = available_buffer_indices_.back();
207 available_buffer_indices_.pop_back();
208 return result;
209 }
210
211 void FreeBufferIndex(int i) {
Austin Schuhc5047ea2021-03-20 22:00:21 -0700212 // This extra checking has a large performance hit with sanitizers that
213 // track memory accesses, so just skip it.
214#if !__has_feature(memory_sanitizer) && !__has_feature(address_sanitizer)
Brian Silverman661eb8d2020-08-12 19:41:01 -0700215 DCHECK(std::find(available_buffer_indices_.begin(),
216 available_buffer_indices_.end(),
217 i) == available_buffer_indices_.end())
218 << ": Buffer is not in use: " << i;
Brian Silvermanf3e6df22021-01-19 15:02:21 -0800219#endif
Brian Silverman661eb8d2020-08-12 19:41:01 -0700220 available_buffer_indices_.push_back(i);
221 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700222
223 // Makes a connected raw sender which calls Send below.
Austin Schuh8fb315a2020-11-19 22:33:58 -0800224 ::std::unique_ptr<RawSender> MakeRawSender(SimulatedEventLoop *event_loop);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700225
226 // Makes a connected raw fetcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800227 ::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700228
229 // Registers a watcher for the queue.
Austin Schuh7d87b672019-12-01 20:23:49 -0800230 void MakeRawWatcher(SimulatedWatcher *watcher);
Austin Schuh39788ff2019-12-01 18:22:57 -0800231
Austin Schuh7d87b672019-12-01 20:23:49 -0800232 void RemoveWatcher(SimulatedWatcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800233 watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
234 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700235
Austin Schuhad154822019-12-27 15:45:13 -0800236 // Sends the message to all the connected receivers and fetchers. Returns the
milind1f1dca32021-07-03 13:50:07 -0700237 // sent queue index, or std::nullopt if messages were sent too fast.
James Kuszmaul890c2492022-04-06 14:59:31 -0700238 std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message,
239 CheckSentTooFast check_sent_too_fast);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700240
241 // Unregisters a fetcher.
242 void UnregisterFetcher(SimulatedFetcher *fetcher);
243
244 std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
245
Austin Schuh39788ff2019-12-01 18:22:57 -0800246 size_t max_size() const { return channel()->max_size(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700247
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800248 const std::string_view name() const {
Austin Schuh39788ff2019-12-01 18:22:57 -0800249 return channel()->name()->string_view();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700250 }
251
Austin Schuh39788ff2019-12-01 18:22:57 -0800252 const Channel *channel() const { return channel_; }
253
Austin Schuhe516ab02020-05-06 21:37:04 -0700254 void CountSenderCreated() {
Brian Silverman661eb8d2020-08-12 19:41:01 -0700255 CheckBufferCount();
Austin Schuhe516ab02020-05-06 21:37:04 -0700256 if (sender_count_ >= channel()->num_senders()) {
257 LOG(FATAL) << "Failed to create sender on "
258 << configuration::CleanedChannelToString(channel())
259 << ", too many senders.";
260 }
261 ++sender_count_;
262 }
Brian Silverman77162972020-08-12 19:52:40 -0700263
Austin Schuhe516ab02020-05-06 21:37:04 -0700264 void CountSenderDestroyed() {
265 --sender_count_;
266 CHECK_GE(sender_count_, 0);
James Kuszmaul890c2492022-04-06 14:59:31 -0700267 if (sender_count_ == 0) {
268 allow_new_senders_ = true;
269 }
Austin Schuhe516ab02020-05-06 21:37:04 -0700270 }
271
Alex Perrycb7da4b2019-08-28 19:35:56 -0700272 private:
Brian Silverman77162972020-08-12 19:52:40 -0700273 void CheckBufferCount() {
274 int reader_count = 0;
275 if (channel()->read_method() == ReadMethod::PIN) {
276 reader_count = watchers_.size() + fetchers_.size();
277 }
278 CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
279 }
280
281 void CheckReaderCount() {
282 if (channel()->read_method() != ReadMethod::PIN) {
283 return;
284 }
285 CheckBufferCount();
286 const int reader_count = watchers_.size() + fetchers_.size();
287 if (reader_count >= channel()->num_readers()) {
288 LOG(FATAL) << "Failed to create reader on "
289 << configuration::CleanedChannelToString(channel())
290 << ", too many readers.";
291 }
292 }
Brian Silverman661eb8d2020-08-12 19:41:01 -0700293
294 const Channel *const channel_;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700295 const std::chrono::nanoseconds channel_storage_duration_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700296
297 // List of all watchers.
Austin Schuh7d87b672019-12-01 20:23:49 -0800298 ::std::vector<SimulatedWatcher *> watchers_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700299
300 // List of all fetchers.
301 ::std::vector<SimulatedFetcher *> fetchers_;
302 std::shared_ptr<SimulatedMessage> latest_message_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700303
304 ipc_lib::QueueIndex next_queue_index_;
Austin Schuhe516ab02020-05-06 21:37:04 -0700305
306 int sender_count_ = 0;
James Kuszmaul890c2492022-04-06 14:59:31 -0700307 // Used to track when an exclusive sender has been created (e.g., for log
308 // replay) and we want to prevent new senders from being accidentally created.
309 bool allow_new_senders_ = true;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700310
311 std::vector<uint16_t> available_buffer_indices_;
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700312
313 const EventScheduler *scheduler_;
314
315 // Queue of all the message send times in the last channel_storage_duration_
316 std::queue<monotonic_clock::time_point> last_times_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700317};
318
319namespace {
320
Brian Silverman661eb8d2020-08-12 19:41:01 -0700321std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700322 SimulatedChannel *channel, RawSender::SharedSpan data) {
Austin Schuh62288252020-11-18 23:26:04 -0800323 // The allocations in here are due to infrastructure and don't count in the no
324 // mallocs in RT code.
325 ScopedNotRealtime nrt;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700326
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700327 auto message = std::make_shared<SimulatedMessage>(channel);
328 message->context.size = data->size();
329 message->context.data = data->data();
330 message->data = std::move(data);
331
332 return message;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700333}
334
335SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
336 : channel(channel_in) {
Brian Silverman4f4e0612020-08-12 19:54:41 -0700337 context.buffer_index = channel->GetBufferIndex();
Brian Silverman661eb8d2020-08-12 19:41:01 -0700338}
339
340SimulatedMessage::~SimulatedMessage() {
Brian Silverman4f4e0612020-08-12 19:54:41 -0700341 channel->FreeBufferIndex(context.buffer_index);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700342}
343
344class SimulatedSender : public RawSender {
345 public:
Austin Schuh8fb315a2020-11-19 22:33:58 -0800346 SimulatedSender(SimulatedChannel *simulated_channel,
347 SimulatedEventLoop *event_loop);
348 ~SimulatedSender() override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700349
350 void *data() override {
351 if (!message_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700352 auto [span, mutable_span] =
353 MakeSharedSpan(simulated_channel_->max_size());
354 message_ = SimulatedMessage::Make(simulated_channel_, span);
355 message_->mutable_data = mutable_span;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700356 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700357 CHECK(message_->is_mutable());
358 return message_->mutable_data.data();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700359 }
360
361 size_t size() override { return simulated_channel_->max_size(); }
362
milind1f1dca32021-07-03 13:50:07 -0700363 Error DoSend(size_t length, monotonic_clock::time_point monotonic_remote_time,
364 realtime_clock::time_point realtime_remote_time,
365 uint32_t remote_queue_index,
366 const UUID &source_boot_uuid) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700367
milind1f1dca32021-07-03 13:50:07 -0700368 Error DoSend(const void *msg, size_t size,
369 monotonic_clock::time_point monotonic_remote_time,
370 realtime_clock::time_point realtime_remote_time,
371 uint32_t remote_queue_index,
372 const UUID &source_boot_uuid) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700373
milind1f1dca32021-07-03 13:50:07 -0700374 Error DoSend(const SharedSpan data,
375 aos::monotonic_clock::time_point monotonic_remote_time,
376 aos::realtime_clock::time_point realtime_remote_time,
377 uint32_t remote_queue_index,
378 const UUID &source_boot_uuid) override;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700379
Brian Silverman4f4e0612020-08-12 19:54:41 -0700380 int buffer_index() override {
381 // First, ensure message_ is allocated.
382 data();
383 return message_->context.buffer_index;
384 }
385
Alex Perrycb7da4b2019-08-28 19:35:56 -0700386 private:
387 SimulatedChannel *simulated_channel_;
Austin Schuh58646e22021-08-23 23:51:46 -0700388 SimulatedEventLoop *simulated_event_loop_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700389
390 std::shared_ptr<SimulatedMessage> message_;
391};
392} // namespace
393
394class SimulatedFetcher : public RawFetcher {
395 public:
Austin Schuhac0771c2020-01-07 18:36:30 -0800396 explicit SimulatedFetcher(EventLoop *event_loop,
397 SimulatedChannel *simulated_channel)
398 : RawFetcher(event_loop, simulated_channel->channel()),
399 simulated_channel_(simulated_channel) {}
400 ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700401
Austin Schuh39788ff2019-12-01 18:22:57 -0800402 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
Austin Schuh62288252020-11-18 23:26:04 -0800403 // The allocations in here are due to infrastructure and don't count in the
404 // no mallocs in RT code.
405 ScopedNotRealtime nrt;
Austin Schuh39788ff2019-12-01 18:22:57 -0800406 if (msgs_.size() == 0) {
407 return std::make_pair(false, monotonic_clock::min_time);
408 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700409
James Kuszmaulbcd96fc2020-10-12 20:29:32 -0700410 CHECK(!fell_behind_) << ": Got behind on "
411 << configuration::StrippedChannelToString(
412 simulated_channel_->channel());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700413
Alex Perrycb7da4b2019-08-28 19:35:56 -0700414 SetMsg(msgs_.front());
415 msgs_.pop_front();
Austin Schuha5e14192020-01-06 18:02:41 -0800416 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700417 }
418
Austin Schuh39788ff2019-12-01 18:22:57 -0800419 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
Austin Schuh62288252020-11-18 23:26:04 -0800420 // The allocations in here are due to infrastructure and don't count in the
421 // no mallocs in RT code.
422 ScopedNotRealtime nrt;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700423 if (msgs_.size() == 0) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800424 // TODO(austin): Can we just do this logic unconditionally? It is a lot
425 // simpler. And call clear, obviously.
Austin Schuhac0771c2020-01-07 18:36:30 -0800426 if (!msg_ && simulated_channel_->latest_message()) {
427 SetMsg(simulated_channel_->latest_message());
Austin Schuha5e14192020-01-06 18:02:41 -0800428 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700429 } else {
Austin Schuh39788ff2019-12-01 18:22:57 -0800430 return std::make_pair(false, monotonic_clock::min_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700431 }
432 }
433
434 // We've had a message enqueued, so we don't need to go looking for the
435 // latest message from before we started.
436 SetMsg(msgs_.back());
437 msgs_.clear();
Brian Silverman661eb8d2020-08-12 19:41:01 -0700438 fell_behind_ = false;
Austin Schuha5e14192020-01-06 18:02:41 -0800439 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700440 }
441
442 private:
443 friend class SimulatedChannel;
444
445 // Updates the state inside RawFetcher to point to the data in msg_.
446 void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800447 msg_ = std::move(msg);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700448 context_ = msg_->context;
Brian Silverman4f4e0612020-08-12 19:54:41 -0700449 if (channel()->read_method() != ReadMethod::PIN) {
450 context_.buffer_index = -1;
451 }
Austin Schuhad154822019-12-27 15:45:13 -0800452 if (context_.remote_queue_index == 0xffffffffu) {
453 context_.remote_queue_index = context_.queue_index;
454 }
Austin Schuh58646e22021-08-23 23:51:46 -0700455 if (context_.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800456 context_.monotonic_remote_time = context_.monotonic_event_time;
457 }
Austin Schuh58646e22021-08-23 23:51:46 -0700458 if (context_.realtime_remote_time == realtime_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800459 context_.realtime_remote_time = context_.realtime_event_time;
460 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700461 }
462
463 // Internal method for Simulation to add a message to the buffer.
464 void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800465 msgs_.emplace_back(std::move(buffer));
Brian Silverman661eb8d2020-08-12 19:41:01 -0700466 if (fell_behind_ ||
467 msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
468 fell_behind_ = true;
469 // Might as well empty out all the intermediate messages now.
470 while (msgs_.size() > 1) {
471 msgs_.pop_front();
472 }
473 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700474 }
475
Austin Schuhac0771c2020-01-07 18:36:30 -0800476 SimulatedChannel *simulated_channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700477 std::shared_ptr<SimulatedMessage> msg_;
478
479 // Messages queued up but not in use.
480 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700481
482 // Whether we're currently "behind", which means a FetchNext call will fail.
483 bool fell_behind_ = false;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700484};
485
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800486class SimulatedTimerHandler : public TimerHandler,
487 public EventScheduler::Event {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700488 public:
489 explicit SimulatedTimerHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800490 SimulatedEventLoop *simulated_event_loop,
Austin Schuh39788ff2019-12-01 18:22:57 -0800491 ::std::function<void()> fn);
Austin Schuh7d87b672019-12-01 20:23:49 -0800492 ~SimulatedTimerHandler() { Disable(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700493
494 void Setup(monotonic_clock::time_point base,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800495 monotonic_clock::duration repeat_offset) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700496
Austin Schuhf4b09c72021-12-08 12:04:37 -0800497 void HandleEvent() noexcept;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700498
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800499 void Handle() noexcept override;
500
Austin Schuh7d87b672019-12-01 20:23:49 -0800501 void Disable() override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700502
Alex Perrycb7da4b2019-08-28 19:35:56 -0700503 private:
Austin Schuh7d87b672019-12-01 20:23:49 -0800504 SimulatedEventLoop *simulated_event_loop_;
505 EventHandler<SimulatedTimerHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700506 EventScheduler *scheduler_;
507 EventScheduler::Token token_;
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800508
Alex Perrycb7da4b2019-08-28 19:35:56 -0700509 monotonic_clock::time_point base_;
510 monotonic_clock::duration repeat_offset_;
511};
512
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800513class SimulatedPhasedLoopHandler : public PhasedLoopHandler,
514 public EventScheduler::Event {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700515 public:
516 SimulatedPhasedLoopHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800517 SimulatedEventLoop *simulated_event_loop,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700518 ::std::function<void(int)> fn,
519 const monotonic_clock::duration interval,
Austin Schuh39788ff2019-12-01 18:22:57 -0800520 const monotonic_clock::duration offset);
Austin Schuh7d87b672019-12-01 20:23:49 -0800521 ~SimulatedPhasedLoopHandler();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700522
Austin Schuhf4b09c72021-12-08 12:04:37 -0800523 void HandleEvent() noexcept;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700524
Austin Schuh7d87b672019-12-01 20:23:49 -0800525 void Schedule(monotonic_clock::time_point sleep_time) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700526
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800527 void Handle() noexcept override;
528
Alex Perrycb7da4b2019-08-28 19:35:56 -0700529 private:
Austin Schuh39788ff2019-12-01 18:22:57 -0800530 SimulatedEventLoop *simulated_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800531 EventHandler<SimulatedPhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700532
Austin Schuh39788ff2019-12-01 18:22:57 -0800533 EventScheduler *scheduler_;
534 EventScheduler::Token token_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700535};
536
537class SimulatedEventLoop : public EventLoop {
538 public:
539 explicit SimulatedEventLoop(
Brian Silverman661eb8d2020-08-12 19:41:01 -0700540 EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700541 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
542 *channels,
543 const Configuration *configuration,
Austin Schuh057d29f2021-08-21 23:05:15 -0700544 std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
James Kuszmaul890c2492022-04-06 14:59:31 -0700545 pid_t tid, EventLoopOptions options)
Austin Schuh83c7f702021-01-19 22:36:29 -0800546 : EventLoop(CHECK_NOTNULL(configuration)),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700547 scheduler_(scheduler),
Austin Schuhac0771c2020-01-07 18:36:30 -0800548 node_event_loop_factory_(node_event_loop_factory),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700549 channels_(channels),
Austin Schuh057d29f2021-08-21 23:05:15 -0700550 event_loops_(event_loops_),
Austin Schuh217a9782019-12-21 23:02:50 -0800551 node_(node),
Austin Schuh58646e22021-08-23 23:51:46 -0700552 tid_(tid),
James Kuszmaul890c2492022-04-06 14:59:31 -0700553 startup_tracker_(std::make_shared<StartupTracker>()),
554 options_(options) {
Austin Schuh58646e22021-08-23 23:51:46 -0700555 startup_tracker_->loop = this;
556 scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
557 if (startup_tracker->loop) {
558 startup_tracker->loop->Setup();
559 startup_tracker->has_setup = true;
560 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700561 });
562
563 event_loops_->push_back(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700564 }
Austin Schuh58646e22021-08-23 23:51:46 -0700565
Alex Perrycb7da4b2019-08-28 19:35:56 -0700566 ~SimulatedEventLoop() override {
Austin Schuh39788ff2019-12-01 18:22:57 -0800567 // Trigger any remaining senders or fetchers to be cleared before destroying
568 // the event loop so the book keeping matches.
569 timing_report_sender_.reset();
570
571 // Force everything with a registered fd with epoll to be destroyed now.
572 timers_.clear();
573 phased_loops_.clear();
574 watchers_.clear();
575
Austin Schuh58646e22021-08-23 23:51:46 -0700576 for (auto it = event_loops_->begin(); it != event_loops_->end(); ++it) {
Austin Schuh057d29f2021-08-21 23:05:15 -0700577 if (*it == this) {
578 event_loops_->erase(it);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700579 break;
580 }
581 }
Austin Schuh58646e22021-08-23 23:51:46 -0700582 VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
583 << monotonic_now() << " ~SimulatedEventLoop(\"" << name_ << "\")";
584 startup_tracker_->loop = nullptr;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700585 }
586
Austin Schuh057d29f2021-08-21 23:05:15 -0700587 void SetIsRunning(bool running) {
Austin Schuh58646e22021-08-23 23:51:46 -0700588 VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
589 << monotonic_now() << " " << name_ << " set_is_running(" << running
590 << ")";
591 CHECK(startup_tracker_->has_setup);
Austin Schuh057d29f2021-08-21 23:05:15 -0700592
593 set_is_running(running);
Austin Schuh58646e22021-08-23 23:51:46 -0700594 if (running) {
595 has_run_ = true;
596 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700597 }
598
Austin Schuh8fb315a2020-11-19 22:33:58 -0800599 bool has_run() const { return has_run_; }
600
Austin Schuh7d87b672019-12-01 20:23:49 -0800601 std::chrono::nanoseconds send_delay() const { return send_delay_; }
602 void set_send_delay(std::chrono::nanoseconds send_delay) {
603 send_delay_ = send_delay;
604 }
605
Stephan Pleines559fa6c2022-01-06 17:23:51 -0800606 monotonic_clock::time_point monotonic_now() const override {
Austin Schuhac0771c2020-01-07 18:36:30 -0800607 return node_event_loop_factory_->monotonic_now();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700608 }
609
Stephan Pleines559fa6c2022-01-06 17:23:51 -0800610 realtime_clock::time_point realtime_now() const override {
Austin Schuhac0771c2020-01-07 18:36:30 -0800611 return node_event_loop_factory_->realtime_now();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700612 }
613
Austin Schuh58646e22021-08-23 23:51:46 -0700614 distributed_clock::time_point distributed_now() {
615 return scheduler_->distributed_now();
616 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700617
Austin Schuh58646e22021-08-23 23:51:46 -0700618 std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
619
620 std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700621
622 void MakeRawWatcher(
623 const Channel *channel,
624 ::std::function<void(const Context &context, const void *message)>
625 watcher) override;
626
627 TimerHandler *AddTimer(::std::function<void()> callback) override {
Austin Schuh39788ff2019-12-01 18:22:57 -0800628 CHECK(!is_running());
Austin Schuh8bd96322020-02-13 21:18:22 -0800629 return NewTimer(::std::unique_ptr<TimerHandler>(
630 new SimulatedTimerHandler(scheduler_, this, callback)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700631 }
632
633 PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
634 const monotonic_clock::duration interval,
635 const monotonic_clock::duration offset =
636 ::std::chrono::seconds(0)) override {
Austin Schuh8bd96322020-02-13 21:18:22 -0800637 return NewPhasedLoop(
638 ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
639 scheduler_, this, callback, interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700640 }
641
642 void OnRun(::std::function<void()> on_run) override {
Austin Schuh8fb315a2020-11-19 22:33:58 -0800643 CHECK(!is_running()) << ": Cannot register OnRun callback while running.";
Austin Schuhcc6070c2020-10-10 20:25:56 -0700644 scheduler_->ScheduleOnRun([this, on_run = std::move(on_run)]() {
Austin Schuhad9e5eb2021-11-19 20:33:55 -0800645 logging::ScopedLogRestorer prev_logger;
646 if (log_impl_) {
647 prev_logger.Swap(log_impl_);
648 }
Austin Schuhcc6070c2020-10-10 20:25:56 -0700649 ScopedMarkRealtimeRestorer rt(priority() > 0);
Austin Schuha9012be2021-07-21 15:19:11 -0700650 SetTimerContext(monotonic_now());
Austin Schuhcc6070c2020-10-10 20:25:56 -0700651 on_run();
652 });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700653 }
654
Austin Schuh217a9782019-12-21 23:02:50 -0800655 const Node *node() const override { return node_; }
656
James Kuszmaul3ae42262019-11-08 12:33:41 -0800657 void set_name(const std::string_view name) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700658 name_ = std::string(name);
659 }
James Kuszmaul3ae42262019-11-08 12:33:41 -0800660 const std::string_view name() const override { return name_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700661
662 SimulatedChannel *GetSimulatedChannel(const Channel *channel);
663
Austin Schuh39788ff2019-12-01 18:22:57 -0800664 void SetRuntimeRealtimePriority(int priority) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700665 CHECK(!is_running()) << ": Cannot set realtime priority while running.";
Austin Schuh39788ff2019-12-01 18:22:57 -0800666 priority_ = priority;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700667 }
668
Austin Schuh39788ff2019-12-01 18:22:57 -0800669 int priority() const override { return priority_; }
670
Brian Silverman6a54ff32020-04-28 16:41:39 -0700671 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) override {
672 CHECK(!is_running()) << ": Cannot set affinity while running.";
673 }
674
Tyler Chatow67ddb032020-01-12 14:30:04 -0800675 void Setup() {
676 MaybeScheduleTimingReports();
677 if (!skip_logger_) {
Austin Schuhad9e5eb2021-11-19 20:33:55 -0800678 log_sender_.Initialize(&name_,
679 MakeSender<logging::LogMessageFbs>("/aos"));
Austin Schuha0c41ba2020-09-10 22:59:14 -0700680 log_impl_ = log_sender_.implementation();
Tyler Chatow67ddb032020-01-12 14:30:04 -0800681 }
682 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800683
Brian Silverman4f4e0612020-08-12 19:54:41 -0700684 int NumberBuffers(const Channel *channel) override;
685
Austin Schuh83c7f702021-01-19 22:36:29 -0800686 const UUID &boot_uuid() const override {
687 return node_event_loop_factory_->boot_uuid();
688 }
689
James Kuszmaul890c2492022-04-06 14:59:31 -0700690 const EventLoopOptions &options() const { return options_; }
691
Alex Perrycb7da4b2019-08-28 19:35:56 -0700692 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800693 friend class SimulatedTimerHandler;
Austin Schuh7d87b672019-12-01 20:23:49 -0800694 friend class SimulatedPhasedLoopHandler;
695 friend class SimulatedWatcher;
696
Austin Schuh58646e22021-08-23 23:51:46 -0700697 // We have a condition where we register a startup handler, but then get shut
698 // down before it runs. This results in a segfault if we are lucky, and
699 // corruption otherwise. To handle that, allocate a small object which points
700 // back to us and can be freed when the function is freed. That object can
701 // then be updated when we get destroyed so setup is not called.
702 struct StartupTracker {
703 SimulatedEventLoop *loop = nullptr;
704 bool has_setup = false;
705 };
706
Austin Schuh7d87b672019-12-01 20:23:49 -0800707 void HandleEvent() {
708 while (true) {
709 if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
710 break;
711 }
712
713 EventLoopEvent *event = PopEvent();
714 event->HandleEvent();
715 }
716 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800717
Austin Schuh39788ff2019-12-01 18:22:57 -0800718 pid_t GetTid() override { return tid_; }
719
Alex Perrycb7da4b2019-08-28 19:35:56 -0700720 EventScheduler *scheduler_;
Austin Schuhac0771c2020-01-07 18:36:30 -0800721 NodeEventLoopFactory *node_event_loop_factory_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700722 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700723 std::vector<SimulatedEventLoop *> *event_loops_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700724
725 ::std::string name_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800726
727 int priority_ = 0;
728
Austin Schuh7d87b672019-12-01 20:23:49 -0800729 std::chrono::nanoseconds send_delay_;
730
Austin Schuh217a9782019-12-21 23:02:50 -0800731 const Node *const node_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800732 const pid_t tid_;
Tyler Chatow67ddb032020-01-12 14:30:04 -0800733
734 AosLogToFbs log_sender_;
Austin Schuha0c41ba2020-09-10 22:59:14 -0700735 std::shared_ptr<logging::LogImplementation> log_impl_ = nullptr;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800736
737 bool has_run_ = false;
Austin Schuh58646e22021-08-23 23:51:46 -0700738
739 std::shared_ptr<StartupTracker> startup_tracker_;
James Kuszmaul890c2492022-04-06 14:59:31 -0700740
741 EventLoopOptions options_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700742};
743
Austin Schuh7d87b672019-12-01 20:23:49 -0800744void SimulatedEventLoopFactory::set_send_delay(
745 std::chrono::nanoseconds send_delay) {
746 send_delay_ = send_delay;
Austin Schuh58646e22021-08-23 23:51:46 -0700747 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
Austin Schuh057d29f2021-08-21 23:05:15 -0700748 if (node) {
749 for (SimulatedEventLoop *loop : node->event_loops_) {
750 loop->set_send_delay(send_delay_);
751 }
752 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800753 }
754}
755
Alex Perrycb7da4b2019-08-28 19:35:56 -0700756void SimulatedEventLoop::MakeRawWatcher(
757 const Channel *channel,
758 std::function<void(const Context &channel, const void *message)> watcher) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800759 TakeWatcher(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800760
Austin Schuh057d29f2021-08-21 23:05:15 -0700761 std::unique_ptr<SimulatedWatcher> shm_watcher =
762 std::make_unique<SimulatedWatcher>(this, scheduler_, channel,
763 std::move(watcher));
Austin Schuh39788ff2019-12-01 18:22:57 -0800764
765 GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
Austin Schuh057d29f2021-08-21 23:05:15 -0700766
Austin Schuh39788ff2019-12-01 18:22:57 -0800767 NewWatcher(std::move(shm_watcher));
Austin Schuh58646e22021-08-23 23:51:46 -0700768 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
769 << " " << name() << " MakeRawWatcher(\""
770 << configuration::StrippedChannelToString(channel) << "\")";
Austin Schuh8fb315a2020-11-19 22:33:58 -0800771
772 // Order of operations gets kinda wonky if we let people make watchers after
773 // running once. If someone has a valid use case, we can reconsider.
774 CHECK(!has_run()) << ": Can't add a watcher after running.";
Alex Perrycb7da4b2019-08-28 19:35:56 -0700775}
776
777std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
778 const Channel *channel) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800779 TakeSender(channel);
780
Austin Schuh58646e22021-08-23 23:51:46 -0700781 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
782 << " " << name() << " MakeRawSender(\""
783 << configuration::StrippedChannelToString(channel) << "\")";
Alex Perrycb7da4b2019-08-28 19:35:56 -0700784 return GetSimulatedChannel(channel)->MakeRawSender(this);
785}
786
787std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
788 const Channel *channel) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800789 ChannelIndex(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800790
Austin Schuhca4828c2019-12-28 14:21:35 -0800791 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
792 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
793 << "\", \"type\": \"" << channel->type()->string_view()
794 << "\" } is not able to be fetched on this node. Check your "
795 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800796 }
797
Austin Schuh58646e22021-08-23 23:51:46 -0700798 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
799 << " " << name() << " MakeRawFetcher(\""
800 << configuration::StrippedChannelToString(channel) << "\")";
Austin Schuh39788ff2019-12-01 18:22:57 -0800801 return GetSimulatedChannel(channel)->MakeRawFetcher(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700802}
803
804SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
805 const Channel *channel) {
806 auto it = channels_->find(SimpleChannel(channel));
807 if (it == channels_->end()) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700808 it = channels_
809 ->emplace(SimpleChannel(channel),
810 std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
811 channel,
812 std::chrono::nanoseconds(
813 configuration()->channel_storage_duration()),
814 scheduler_)))
815 .first;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700816 }
817 return it->second.get();
818}
819
Brian Silverman4f4e0612020-08-12 19:54:41 -0700820int SimulatedEventLoop::NumberBuffers(const Channel *channel) {
821 return GetSimulatedChannel(channel)->number_buffers();
822}
823
Austin Schuh7d87b672019-12-01 20:23:49 -0800824SimulatedWatcher::SimulatedWatcher(
825 SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
Austin Schuh8bd96322020-02-13 21:18:22 -0800826 const Channel *channel,
Austin Schuh7d87b672019-12-01 20:23:49 -0800827 std::function<void(const Context &context, const void *message)> fn)
828 : WatcherState(simulated_event_loop, channel, std::move(fn)),
829 simulated_event_loop_(simulated_event_loop),
Brian Silverman4f4e0612020-08-12 19:54:41 -0700830 channel_(channel),
Austin Schuh7d87b672019-12-01 20:23:49 -0800831 scheduler_(scheduler),
Brian Silverman4f4e0612020-08-12 19:54:41 -0700832 event_(this),
Austin Schuh58646e22021-08-23 23:51:46 -0700833 token_(scheduler_->InvalidToken()) {
834 VLOG(1) << simulated_event_loop_->distributed_now() << " "
835 << NodeName(simulated_event_loop_->node())
836 << simulated_event_loop_->monotonic_now() << " "
837 << simulated_event_loop_->name() << " Watching "
838 << configuration::StrippedChannelToString(channel_);
839}
Austin Schuh7d87b672019-12-01 20:23:49 -0800840
841SimulatedWatcher::~SimulatedWatcher() {
Austin Schuh58646e22021-08-23 23:51:46 -0700842 VLOG(1) << simulated_event_loop_->distributed_now() << " "
Austin Schuh057d29f2021-08-21 23:05:15 -0700843 << NodeName(simulated_event_loop_->node())
Austin Schuh58646e22021-08-23 23:51:46 -0700844 << simulated_event_loop_->monotonic_now() << " "
845 << simulated_event_loop_->name() << " ~Watching "
Austin Schuh057d29f2021-08-21 23:05:15 -0700846 << configuration::StrippedChannelToString(channel_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800847 simulated_event_loop_->RemoveEvent(&event_);
848 if (token_ != scheduler_->InvalidToken()) {
849 scheduler_->Deschedule(token_);
850 }
Brian Silverman4f4e0612020-08-12 19:54:41 -0700851 CHECK_NOTNULL(simulated_channel_)->RemoveWatcher(this);
Austin Schuh7d87b672019-12-01 20:23:49 -0800852}
853
Austin Schuh8fb315a2020-11-19 22:33:58 -0800854bool SimulatedWatcher::has_run() const {
855 return simulated_event_loop_->has_run();
856}
857
Austin Schuh7d87b672019-12-01 20:23:49 -0800858void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
Austin Schuha5e14192020-01-06 18:02:41 -0800859 monotonic_clock::time_point event_time =
860 simulated_event_loop_->monotonic_now();
Austin Schuh7d87b672019-12-01 20:23:49 -0800861
862 // Messages are queued in order. If we are the first, add ourselves.
863 // Otherwise, don't.
864 if (msgs_.size() == 0) {
Austin Schuhad154822019-12-27 15:45:13 -0800865 event_.set_event_time(message->context.monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800866 simulated_event_loop_->AddEvent(&event_);
867
868 DoSchedule(event_time);
869 }
870
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800871 msgs_.emplace_back(std::move(message));
Austin Schuh7d87b672019-12-01 20:23:49 -0800872}
873
Austin Schuhf4b09c72021-12-08 12:04:37 -0800874void SimulatedWatcher::HandleEvent() noexcept {
Austin Schuh7d87b672019-12-01 20:23:49 -0800875 const monotonic_clock::time_point monotonic_now =
876 simulated_event_loop_->monotonic_now();
Austin Schuh58646e22021-08-23 23:51:46 -0700877 VLOG(1) << simulated_event_loop_->distributed_now() << " "
878 << NodeName(simulated_event_loop_->node())
879 << simulated_event_loop_->monotonic_now() << " "
880 << simulated_event_loop_->name() << " Watcher "
Austin Schuh057d29f2021-08-21 23:05:15 -0700881 << configuration::StrippedChannelToString(channel_);
882 CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
883
Tyler Chatow67ddb032020-01-12 14:30:04 -0800884 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -0700885 if (simulated_event_loop_->log_impl_) {
886 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -0800887 }
Austin Schuhad154822019-12-27 15:45:13 -0800888 Context context = msgs_.front()->context;
889
Brian Silverman4f4e0612020-08-12 19:54:41 -0700890 if (channel_->read_method() != ReadMethod::PIN) {
891 context.buffer_index = -1;
892 }
Austin Schuhad154822019-12-27 15:45:13 -0800893 if (context.remote_queue_index == 0xffffffffu) {
894 context.remote_queue_index = context.queue_index;
895 }
Austin Schuh58646e22021-08-23 23:51:46 -0700896 if (context.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800897 context.monotonic_remote_time = context.monotonic_event_time;
898 }
Austin Schuh58646e22021-08-23 23:51:46 -0700899 if (context.realtime_remote_time == realtime_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800900 context.realtime_remote_time = context.realtime_event_time;
901 }
902
Austin Schuhcc6070c2020-10-10 20:25:56 -0700903 {
904 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
905 DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
906 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800907
908 msgs_.pop_front();
Austin Schuheb4e4ce2020-09-10 23:04:18 -0700909 if (token_ != scheduler_->InvalidToken()) {
910 scheduler_->Deschedule(token_);
911 token_ = scheduler_->InvalidToken();
912 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800913 if (msgs_.size() != 0) {
Austin Schuhad154822019-12-27 15:45:13 -0800914 event_.set_event_time(msgs_.front()->context.monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800915 simulated_event_loop_->AddEvent(&event_);
916
917 DoSchedule(event_.event_time());
Austin Schuh7d87b672019-12-01 20:23:49 -0800918 }
919}
920
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800921void SimulatedWatcher::Handle() noexcept {
922 DCHECK(token_ != scheduler_->InvalidToken());
923 token_ = scheduler_->InvalidToken();
924 simulated_event_loop_->HandleEvent();
925}
926
Austin Schuh7d87b672019-12-01 20:23:49 -0800927void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
Austin Schuheb4e4ce2020-09-10 23:04:18 -0700928 CHECK(token_ == scheduler_->InvalidToken())
929 << ": May not schedule multiple times";
930 token_ = scheduler_->Schedule(
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800931 event_time + simulated_event_loop_->send_delay(), this);
Austin Schuh7d87b672019-12-01 20:23:49 -0800932}
933
934void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
Brian Silverman77162972020-08-12 19:52:40 -0700935 CheckReaderCount();
Austin Schuh39788ff2019-12-01 18:22:57 -0800936 watcher->SetSimulatedChannel(this);
937 watchers_.emplace_back(watcher);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700938}
939
940::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
Austin Schuh8fb315a2020-11-19 22:33:58 -0800941 SimulatedEventLoop *event_loop) {
James Kuszmaul890c2492022-04-06 14:59:31 -0700942 CHECK(allow_new_senders_)
943 << ": Attempted to create a new sender on exclusive channel "
944 << configuration::StrippedChannelToString(channel_);
945 if (event_loop->options().exclusive_senders == ExclusiveSenders::kYes) {
946 CHECK_EQ(0, sender_count_)
947 << ": Attempted to add an exclusive sender on a channel with existing "
948 "senders: "
949 << configuration::StrippedChannelToString(channel_);
950 allow_new_senders_ = false;
951 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700952 return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
953}
954
Austin Schuh39788ff2019-12-01 18:22:57 -0800955::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
956 EventLoop *event_loop) {
Brian Silverman77162972020-08-12 19:52:40 -0700957 CheckReaderCount();
Austin Schuh39788ff2019-12-01 18:22:57 -0800958 ::std::unique_ptr<SimulatedFetcher> fetcher(
959 new SimulatedFetcher(event_loop, this));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700960 fetchers_.push_back(fetcher.get());
961 return ::std::move(fetcher);
962}
963
milind1f1dca32021-07-03 13:50:07 -0700964std::optional<uint32_t> SimulatedChannel::Send(
James Kuszmaul890c2492022-04-06 14:59:31 -0700965 std::shared_ptr<SimulatedMessage> message, CheckSentTooFast check_sent_too_fast) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700966 const auto now = scheduler_->monotonic_now();
967 // Remove times that are greater than or equal to a channel_storage_duration_
968 // ago
969 while (!last_times_.empty() &&
970 (now - last_times_.front() >= channel_storage_duration_)) {
971 last_times_.pop();
972 }
973
974 // Check that we are not sending messages too fast
James Kuszmaul890c2492022-04-06 14:59:31 -0700975 if (check_sent_too_fast == CheckSentTooFast::kYes &&
976 static_cast<int>(last_times_.size()) >= queue_size()) {
Eric Schmiedebergef44b8a2022-02-28 17:30:38 -0700977 return std::nullopt;
978 }
979
980 const std::optional<uint32_t> queue_index = {next_queue_index_.index()};
981 last_times_.push(now);
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700982
milind1f1dca32021-07-03 13:50:07 -0700983 message->context.queue_index = *queue_index;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700984 // Points to the actual data depending on the size set in context. Data may
985 // allocate more than the actual size of the message, so offset from the back
986 // of that to get the actual start of the data.
987 message->context.data =
988 message->data->data() + message->data->size() - message->context.size;
Austin Schuha9df9ad2021-06-16 14:49:39 -0700989
990 DCHECK(channel()->has_schema())
991 << ": Missing schema for channel "
992 << configuration::StrippedChannelToString(channel());
993 DCHECK(flatbuffers::Verify(
994 *channel()->schema(), *channel()->schema()->root_table(),
995 static_cast<const uint8_t *>(message->context.data),
996 message->context.size))
997 << ": Corrupted flatbuffer on " << channel()->name()->c_str() << " "
998 << channel()->type()->c_str();
999
Alex Perrycb7da4b2019-08-28 19:35:56 -07001000 next_queue_index_ = next_queue_index_.Increment();
1001
Austin Schuhe6f4c8d2021-12-11 12:36:06 -08001002 latest_message_ = std::move(message);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001003 for (SimulatedWatcher *watcher : watchers_) {
1004 if (watcher->has_run()) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -08001005 watcher->Schedule(latest_message_);
Alex Perrycb7da4b2019-08-28 19:35:56 -07001006 }
1007 }
1008 for (auto &fetcher : fetchers_) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -08001009 fetcher->Enqueue(latest_message_);
Alex Perrycb7da4b2019-08-28 19:35:56 -07001010 }
Austin Schuhad154822019-12-27 15:45:13 -08001011 return queue_index;
Alex Perrycb7da4b2019-08-28 19:35:56 -07001012}
1013
1014void SimulatedChannel::UnregisterFetcher(SimulatedFetcher *fetcher) {
1015 fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
1016}
1017
Austin Schuh8fb315a2020-11-19 22:33:58 -08001018SimulatedSender::SimulatedSender(SimulatedChannel *simulated_channel,
1019 SimulatedEventLoop *event_loop)
1020 : RawSender(event_loop, simulated_channel->channel()),
1021 simulated_channel_(simulated_channel),
Austin Schuh58646e22021-08-23 23:51:46 -07001022 simulated_event_loop_(event_loop) {
Austin Schuh8fb315a2020-11-19 22:33:58 -08001023 simulated_channel_->CountSenderCreated();
1024}
1025
1026SimulatedSender::~SimulatedSender() {
1027 simulated_channel_->CountSenderDestroyed();
1028}
1029
milind1f1dca32021-07-03 13:50:07 -07001030RawSender::Error SimulatedSender::DoSend(
1031 size_t length, monotonic_clock::time_point monotonic_remote_time,
1032 realtime_clock::time_point realtime_remote_time,
1033 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Austin Schuh58646e22021-08-23 23:51:46 -07001034 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1035 << NodeName(simulated_event_loop_->node())
1036 << simulated_event_loop_->monotonic_now() << " "
1037 << simulated_event_loop_->name() << " Send "
1038 << configuration::StrippedChannelToString(channel());
1039
Austin Schuh8fb315a2020-11-19 22:33:58 -08001040 // The allocations in here are due to infrastructure and don't count in the
1041 // no mallocs in RT code.
1042 ScopedNotRealtime nrt;
1043 CHECK_LE(length, size()) << ": Attempting to send too big a message.";
Austin Schuh58646e22021-08-23 23:51:46 -07001044 message_->context.monotonic_event_time =
1045 simulated_event_loop_->monotonic_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001046 message_->context.monotonic_remote_time = monotonic_remote_time;
1047 message_->context.remote_queue_index = remote_queue_index;
Austin Schuh58646e22021-08-23 23:51:46 -07001048 message_->context.realtime_event_time = simulated_event_loop_->realtime_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001049 message_->context.realtime_remote_time = realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -07001050 message_->context.source_boot_uuid = source_boot_uuid;
Austin Schuh8fb315a2020-11-19 22:33:58 -08001051 CHECK_LE(length, message_->context.size);
1052 message_->context.size = length;
1053
milind1f1dca32021-07-03 13:50:07 -07001054 const std::optional<uint32_t> optional_queue_index =
James Kuszmaul890c2492022-04-06 14:59:31 -07001055 simulated_channel_->Send(message_, simulated_event_loop_->options().check_sent_too_fast);
milind1f1dca32021-07-03 13:50:07 -07001056
1057 // Check that we are not sending messages too fast
1058 if (!optional_queue_index) {
1059 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1060 << NodeName(simulated_event_loop_->node())
1061 << simulated_event_loop_->monotonic_now() << " "
1062 << simulated_event_loop_->name()
1063 << "\nMessages were sent too fast:\n"
1064 << "For channel: "
1065 << configuration::CleanedChannelToString(
1066 simulated_channel_->channel())
1067 << '\n'
1068 << "Tried to send more than " << simulated_channel_->queue_size()
1069 << " (queue size) messages in the last "
1070 << std::chrono::duration<double>(
1071 simulated_channel_->channel_storage_duration())
1072 .count()
1073 << " seconds (channel storage duration)"
1074 << "\n\n";
1075 return Error::kMessagesSentTooFast;
1076 }
1077
1078 sent_queue_index_ = *optional_queue_index;
Austin Schuh58646e22021-08-23 23:51:46 -07001079 monotonic_sent_time_ = simulated_event_loop_->monotonic_now();
1080 realtime_sent_time_ = simulated_event_loop_->realtime_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001081
1082 // Drop the reference to the message so that we allocate a new message for
1083 // next time. Otherwise we will continue to reuse the same memory for all
1084 // messages and corrupt it.
1085 message_.reset();
milind1f1dca32021-07-03 13:50:07 -07001086 return Error::kOk;
Austin Schuh8fb315a2020-11-19 22:33:58 -08001087}
1088
milind1f1dca32021-07-03 13:50:07 -07001089RawSender::Error SimulatedSender::DoSend(
1090 const void *msg, size_t size,
1091 monotonic_clock::time_point monotonic_remote_time,
1092 realtime_clock::time_point realtime_remote_time,
1093 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Austin Schuh102667e2020-12-11 20:13:28 -08001094 CHECK_LE(size, this->size())
1095 << ": Attempting to send too big a message on "
1096 << configuration::CleanedChannelToString(simulated_channel_->channel());
Austin Schuh8fb315a2020-11-19 22:33:58 -08001097
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001098 // Allocates an aligned buffer in which to copy unaligned msg.
1099 auto [span, mutable_span] = MakeSharedSpan(size);
1100 message_ = SimulatedMessage::Make(simulated_channel_, span);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001101
1102 // Now fill in the message. size is already populated above, and
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001103 // queue_index will be populated in simulated_channel_.
1104 memcpy(mutable_span.data(), msg, size);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001105
1106 return DoSend(size, monotonic_remote_time, realtime_remote_time,
Austin Schuha9012be2021-07-21 15:19:11 -07001107 remote_queue_index, source_boot_uuid);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001108}
1109
milind1f1dca32021-07-03 13:50:07 -07001110RawSender::Error SimulatedSender::DoSend(
1111 const RawSender::SharedSpan data,
1112 monotonic_clock::time_point monotonic_remote_time,
1113 realtime_clock::time_point realtime_remote_time,
1114 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001115 CHECK_LE(data->size(), this->size())
1116 << ": Attempting to send too big a message on "
1117 << configuration::CleanedChannelToString(simulated_channel_->channel());
1118
1119 // Constructs a message sharing the already allocated and aligned message
1120 // data.
1121 message_ = SimulatedMessage::Make(simulated_channel_, data);
1122
1123 return DoSend(data->size(), monotonic_remote_time, realtime_remote_time,
1124 remote_queue_index, source_boot_uuid);
1125}
1126
Austin Schuh39788ff2019-12-01 18:22:57 -08001127SimulatedTimerHandler::SimulatedTimerHandler(
Austin Schuh8bd96322020-02-13 21:18:22 -08001128 EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
1129 ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -08001130 : TimerHandler(simulated_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -08001131 simulated_event_loop_(simulated_event_loop),
1132 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -08001133 scheduler_(scheduler),
1134 token_(scheduler_->InvalidToken()) {}
1135
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001136void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
1137 monotonic_clock::duration repeat_offset) {
Austin Schuh62288252020-11-18 23:26:04 -08001138 // The allocations in here are due to infrastructure and don't count in the no
1139 // mallocs in RT code.
1140 ScopedNotRealtime nrt;
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001141 Disable();
Austin Schuh58646e22021-08-23 23:51:46 -07001142 const monotonic_clock::time_point monotonic_now =
Austin Schuha5e14192020-01-06 18:02:41 -08001143 simulated_event_loop_->monotonic_now();
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001144 base_ = base;
1145 repeat_offset_ = repeat_offset;
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001146 token_ = scheduler_->Schedule(std::max(base, monotonic_now), this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001147 event_.set_event_time(base_);
1148 simulated_event_loop_->AddEvent(&event_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001149}
1150
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001151void SimulatedTimerHandler::Handle() noexcept {
1152 DCHECK(token_ != scheduler_->InvalidToken());
1153 token_ = scheduler_->InvalidToken();
1154 simulated_event_loop_->HandleEvent();
1155}
1156
Austin Schuhf4b09c72021-12-08 12:04:37 -08001157void SimulatedTimerHandler::HandleEvent() noexcept {
Austin Schuh58646e22021-08-23 23:51:46 -07001158 const monotonic_clock::time_point monotonic_now =
Austin Schuha5e14192020-01-06 18:02:41 -08001159 simulated_event_loop_->monotonic_now();
Austin Schuh58646e22021-08-23 23:51:46 -07001160 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1161 << NodeName(simulated_event_loop_->node()) << monotonic_now << " "
1162 << simulated_event_loop_->name() << " Timer '" << name() << "'";
Tyler Chatow67ddb032020-01-12 14:30:04 -08001163 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -07001164 if (simulated_event_loop_->log_impl_) {
1165 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -08001166 }
Austin Schuheb4e4ce2020-09-10 23:04:18 -07001167 if (token_ != scheduler_->InvalidToken()) {
1168 scheduler_->Deschedule(token_);
1169 token_ = scheduler_->InvalidToken();
1170 }
Austin Schuh58646e22021-08-23 23:51:46 -07001171 if (repeat_offset_ != monotonic_clock::zero()) {
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001172 // Reschedule.
1173 while (base_ <= monotonic_now) base_ += repeat_offset_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001174 token_ = scheduler_->Schedule(base_, this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001175 event_.set_event_time(base_);
1176 simulated_event_loop_->AddEvent(&event_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001177 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001178
Austin Schuhcc6070c2020-10-10 20:25:56 -07001179 {
1180 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
1181 Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
1182 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001183}
1184
Austin Schuh7d87b672019-12-01 20:23:49 -08001185void SimulatedTimerHandler::Disable() {
1186 simulated_event_loop_->RemoveEvent(&event_);
1187 if (token_ != scheduler_->InvalidToken()) {
1188 scheduler_->Deschedule(token_);
1189 token_ = scheduler_->InvalidToken();
1190 }
1191}
1192
Austin Schuh39788ff2019-12-01 18:22:57 -08001193SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
Austin Schuh8bd96322020-02-13 21:18:22 -08001194 EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
1195 ::std::function<void(int)> fn, const monotonic_clock::duration interval,
Austin Schuh39788ff2019-12-01 18:22:57 -08001196 const monotonic_clock::duration offset)
1197 : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
1198 simulated_event_loop_(simulated_event_loop),
Austin Schuh7d87b672019-12-01 20:23:49 -08001199 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -08001200 scheduler_(scheduler),
1201 token_(scheduler_->InvalidToken()) {}
1202
Austin Schuh7d87b672019-12-01 20:23:49 -08001203SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
1204 if (token_ != scheduler_->InvalidToken()) {
1205 scheduler_->Deschedule(token_);
1206 token_ = scheduler_->InvalidToken();
1207 }
1208 simulated_event_loop_->RemoveEvent(&event_);
1209}
1210
Austin Schuhf4b09c72021-12-08 12:04:37 -08001211void SimulatedPhasedLoopHandler::HandleEvent() noexcept {
Austin Schuh39788ff2019-12-01 18:22:57 -08001212 monotonic_clock::time_point monotonic_now =
1213 simulated_event_loop_->monotonic_now();
Austin Schuh057d29f2021-08-21 23:05:15 -07001214 VLOG(1) << monotonic_now << " Phased loop " << simulated_event_loop_->name()
1215 << ", " << name();
Tyler Chatow67ddb032020-01-12 14:30:04 -08001216 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -07001217 if (simulated_event_loop_->log_impl_) {
1218 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -08001219 }
Austin Schuhcc6070c2020-10-10 20:25:56 -07001220
1221 {
1222 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
1223 Call([monotonic_now]() { return monotonic_now; },
1224 [this](monotonic_clock::time_point sleep_time) {
1225 Schedule(sleep_time);
1226 });
1227 }
Austin Schuh39788ff2019-12-01 18:22:57 -08001228}
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001229
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001230void SimulatedPhasedLoopHandler::Handle() noexcept {
1231 DCHECK(token_ != scheduler_->InvalidToken());
1232 token_ = scheduler_->InvalidToken();
1233 simulated_event_loop_->HandleEvent();
1234}
1235
Austin Schuh7d87b672019-12-01 20:23:49 -08001236void SimulatedPhasedLoopHandler::Schedule(
1237 monotonic_clock::time_point sleep_time) {
Austin Schuh62288252020-11-18 23:26:04 -08001238 // The allocations in here are due to infrastructure and don't count in the no
1239 // mallocs in RT code.
1240 ScopedNotRealtime nrt;
Austin Schuheb4e4ce2020-09-10 23:04:18 -07001241 if (token_ != scheduler_->InvalidToken()) {
1242 scheduler_->Deschedule(token_);
1243 token_ = scheduler_->InvalidToken();
1244 }
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001245 token_ = scheduler_->Schedule(sleep_time, this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001246 event_.set_event_time(sleep_time);
1247 simulated_event_loop_->AddEvent(&event_);
1248}
1249
Alex Perrycb7da4b2019-08-28 19:35:56 -07001250SimulatedEventLoopFactory::SimulatedEventLoopFactory(
1251 const Configuration *configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -08001252 : configuration_(CHECK_NOTNULL(configuration)),
1253 nodes_(configuration::GetNodes(configuration_)) {
Austin Schuh094d09b2020-11-20 23:26:52 -08001254 CHECK(IsInitialized()) << ": Need to initialize AOS first.";
Austin Schuhac0771c2020-01-07 18:36:30 -08001255 for (const Node *node : nodes_) {
Austin Schuh58646e22021-08-23 23:51:46 -07001256 node_factories_.emplace_back(
1257 new NodeEventLoopFactory(&scheduler_scheduler_, this, node));
Austin Schuh15649d62019-12-28 16:36:38 -08001258 }
Austin Schuh898f4972020-01-11 17:21:25 -08001259
1260 if (configuration::MultiNode(configuration)) {
1261 bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
1262 }
Austin Schuh15649d62019-12-28 16:36:38 -08001263}
1264
Alex Perrycb7da4b2019-08-28 19:35:56 -07001265SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
1266
Austin Schuhac0771c2020-01-07 18:36:30 -08001267NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
Austin Schuh057d29f2021-08-21 23:05:15 -07001268 std::string_view node) {
1269 return GetNodeEventLoopFactory(configuration::GetNode(configuration(), node));
1270}
1271
1272NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
Austin Schuhac0771c2020-01-07 18:36:30 -08001273 const Node *node) {
1274 auto result = std::find_if(
1275 node_factories_.begin(), node_factories_.end(),
1276 [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
1277 return node_factory->node() == node;
1278 });
1279
1280 CHECK(result != node_factories_.end())
1281 << ": Failed to find node " << FlatbufferToJson(node);
1282
1283 return result->get();
1284}
1285
Austin Schuh87dd3832021-01-01 23:07:31 -08001286void SimulatedEventLoopFactory::SetTimeConverter(
1287 TimeConverter *time_converter) {
1288 for (std::unique_ptr<NodeEventLoopFactory> &factory : node_factories_) {
1289 factory->SetTimeConverter(time_converter);
1290 }
Austin Schuh58646e22021-08-23 23:51:46 -07001291 scheduler_scheduler_.SetTimeConverter(time_converter);
Austin Schuh87dd3832021-01-01 23:07:31 -08001292}
1293
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08001294::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
Austin Schuhac0771c2020-01-07 18:36:30 -08001295 std::string_view name, const Node *node) {
1296 if (node == nullptr) {
1297 CHECK(!configuration::MultiNode(configuration()))
1298 << ": Can't make a single node event loop in a multi-node world.";
1299 } else {
1300 CHECK(configuration::MultiNode(configuration()))
1301 << ": Can't make a multi-node event loop in a single-node world.";
1302 }
1303 return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
1304}
1305
Austin Schuh057d29f2021-08-21 23:05:15 -07001306NodeEventLoopFactory::NodeEventLoopFactory(
1307 EventSchedulerScheduler *scheduler_scheduler,
1308 SimulatedEventLoopFactory *factory, const Node *node)
Austin Schuh58646e22021-08-23 23:51:46 -07001309 : scheduler_(configuration::GetNodeIndex(factory->configuration(), node)),
1310 factory_(factory),
1311 node_(node) {
Austin Schuh057d29f2021-08-21 23:05:15 -07001312 scheduler_scheduler->AddEventScheduler(&scheduler_);
Austin Schuh58646e22021-08-23 23:51:46 -07001313 scheduler_.set_started([this]() {
1314 started_ = true;
1315 for (SimulatedEventLoop *event_loop : event_loops_) {
1316 event_loop->SetIsRunning(true);
1317 }
1318 });
Austin Schuhe33c08d2022-02-03 18:15:21 -08001319 scheduler_.set_stopped([this]() {
1320 for (SimulatedEventLoop *event_loop : event_loops_) {
1321 event_loop->SetIsRunning(false);
1322 }
1323 });
Austin Schuh58646e22021-08-23 23:51:46 -07001324 scheduler_.set_on_shutdown([this]() {
1325 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
1326 << monotonic_now() << " Shutting down node.";
1327 Shutdown();
1328 ScheduleStartup();
1329 });
1330 ScheduleStartup();
Austin Schuh057d29f2021-08-21 23:05:15 -07001331}
1332
1333NodeEventLoopFactory::~NodeEventLoopFactory() {
Austin Schuh58646e22021-08-23 23:51:46 -07001334 if (started_) {
1335 for (std::function<void()> &fn : on_shutdown_) {
1336 fn();
1337 }
1338
1339 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1340 << monotonic_now() << " Shutting down applications.";
1341 applications_.clear();
1342 started_ = false;
1343 }
1344
1345 if (event_loops_.size() != 0u) {
1346 for (SimulatedEventLoop *event_loop : event_loops_) {
1347 LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
1348 << monotonic_now() << " Event loop '" << event_loop->name()
1349 << "' failed to shut down";
1350 }
1351 }
Austin Schuh057d29f2021-08-21 23:05:15 -07001352 CHECK_EQ(event_loops_.size(), 0u) << "Event loop didn't exit";
1353}
1354
Austin Schuh58646e22021-08-23 23:51:46 -07001355void NodeEventLoopFactory::OnStartup(std::function<void()> &&fn) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001356 CHECK(!scheduler_.is_running())
Austin Schuh58646e22021-08-23 23:51:46 -07001357 << ": Can only register OnStartup handlers when not running.";
1358 on_startup_.emplace_back(std::move(fn));
1359 if (started_) {
1360 size_t on_startup_index = on_startup_.size() - 1;
1361 scheduler_.ScheduleOnStartup(
1362 [this, on_startup_index]() { on_startup_[on_startup_index](); });
1363 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001364}
1365
Austin Schuh58646e22021-08-23 23:51:46 -07001366void NodeEventLoopFactory::OnShutdown(std::function<void()> &&fn) {
1367 on_shutdown_.emplace_back(std::move(fn));
Austin Schuhc0b0f722020-12-12 18:36:06 -08001368}
Austin Schuh057d29f2021-08-21 23:05:15 -07001369
Austin Schuh58646e22021-08-23 23:51:46 -07001370void NodeEventLoopFactory::ScheduleStartup() {
1371 scheduler_.ScheduleOnStartup([this]() {
1372 UUID next_uuid = scheduler_.boot_uuid();
1373 if (boot_uuid_ != next_uuid) {
Austin Schuh188a2f62021-11-08 10:45:54 -08001374 CHECK_EQ(boot_uuid_, UUID::Zero())
1375 << ": Boot UUID changed without restarting. Did TimeConverter "
1376 "change the boot UUID without signaling a restart, or did you "
1377 "change TimeConverter?";
Austin Schuh58646e22021-08-23 23:51:46 -07001378 boot_uuid_ = next_uuid;
1379 }
1380 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
1381 << monotonic_now() << " Starting up node on boot " << boot_uuid_;
1382 Startup();
1383 });
1384}
1385
1386void NodeEventLoopFactory::Startup() {
1387 CHECK(!started_);
1388 for (size_t i = 0; i < on_startup_.size(); ++i) {
1389 on_startup_[i]();
1390 }
1391}
1392
1393void NodeEventLoopFactory::Shutdown() {
1394 for (SimulatedEventLoop *event_loop : event_loops_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08001395 CHECK(!event_loop->is_running());
Austin Schuh58646e22021-08-23 23:51:46 -07001396 }
1397
1398 CHECK(started_);
1399 started_ = false;
1400 for (std::function<void()> &fn : on_shutdown_) {
1401 fn();
1402 }
1403
1404 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1405 << monotonic_now() << " Shutting down applications.";
1406 applications_.clear();
1407
1408 if (event_loops_.size() != 0u) {
1409 for (SimulatedEventLoop *event_loop : event_loops_) {
1410 LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
1411 << monotonic_now() << " Event loop '" << event_loop->name()
1412 << "' failed to shut down";
1413 }
1414 }
1415 CHECK_EQ(event_loops_.size(), 0u) << "Not all event loops shut down";
1416 boot_uuid_ = UUID::Zero();
1417
1418 channels_.clear();
Austin Schuhc0b0f722020-12-12 18:36:06 -08001419}
1420
Alex Perrycb7da4b2019-08-28 19:35:56 -07001421void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
Austin Schuh58646e22021-08-23 23:51:46 -07001422 // This sets running to true too.
Austin Schuh8bd96322020-02-13 21:18:22 -08001423 scheduler_scheduler_.RunFor(duration);
Austin Schuh057d29f2021-08-21 23:05:15 -07001424 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1425 if (node) {
1426 for (SimulatedEventLoop *loop : node->event_loops_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08001427 CHECK(!loop->is_running());
Austin Schuh057d29f2021-08-21 23:05:15 -07001428 }
1429 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001430 }
1431}
1432
1433void SimulatedEventLoopFactory::Run() {
Austin Schuh58646e22021-08-23 23:51:46 -07001434 // This sets running to true too.
Austin Schuh8bd96322020-02-13 21:18:22 -08001435 scheduler_scheduler_.Run();
Austin Schuh057d29f2021-08-21 23:05:15 -07001436 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1437 if (node) {
1438 for (SimulatedEventLoop *loop : node->event_loops_) {
Austin Schuhe33c08d2022-02-03 18:15:21 -08001439 CHECK(!loop->is_running());
Austin Schuh057d29f2021-08-21 23:05:15 -07001440 }
1441 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001442 }
1443}
1444
Austin Schuh87dd3832021-01-01 23:07:31 -08001445void SimulatedEventLoopFactory::Exit() { scheduler_scheduler_.Exit(); }
Austin Schuh8fb315a2020-11-19 22:33:58 -08001446
Austin Schuh6f3babe2020-01-26 20:34:50 -08001447void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001448 CHECK(bridge_) << ": Can't disable forwarding without a message bridge.";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001449 bridge_->DisableForwarding(channel);
1450}
1451
Austin Schuh4c3b9702020-08-30 11:34:55 -07001452void SimulatedEventLoopFactory::DisableStatistics() {
1453 CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
1454 bridge_->DisableStatistics();
1455}
1456
Austin Schuh48205e62021-11-12 14:13:18 -08001457void SimulatedEventLoopFactory::EnableStatistics() {
1458 CHECK(bridge_) << ": Can't enable statistics without a message bridge.";
1459 bridge_->EnableStatistics();
1460}
1461
Austin Schuh2928ebe2021-02-07 22:10:27 -08001462void SimulatedEventLoopFactory::SkipTimingReport() {
1463 CHECK(bridge_) << ": Can't skip timing reports without a message bridge.";
Austin Schuh48205e62021-11-12 14:13:18 -08001464
1465 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1466 if (node) {
1467 node->SkipTimingReport();
1468 }
1469 }
1470}
1471
1472void NodeEventLoopFactory::SkipTimingReport() {
1473 for (SimulatedEventLoop *event_loop : event_loops_) {
1474 event_loop->SkipTimingReport();
1475 }
1476 skip_timing_report_ = true;
1477}
1478
1479void NodeEventLoopFactory::EnableStatistics() {
1480 CHECK(factory_->bridge_)
1481 << ": Can't enable statistics without a message bridge.";
1482 factory_->bridge_->EnableStatistics(node_);
1483}
1484
1485void NodeEventLoopFactory::DisableStatistics() {
1486 CHECK(factory_->bridge_)
1487 << ": Can't disable statistics without a message bridge.";
1488 factory_->bridge_->DisableStatistics(node_);
Austin Schuh2928ebe2021-02-07 22:10:27 -08001489}
1490
Austin Schuh58646e22021-08-23 23:51:46 -07001491::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
James Kuszmaul890c2492022-04-06 14:59:31 -07001492 std::string_view name, EventLoopOptions options) {
Austin Schuh58646e22021-08-23 23:51:46 -07001493 CHECK(!scheduler_.is_running() || !started_)
1494 << ": Can't create an event loop while running";
1495
1496 pid_t tid = tid_;
1497 ++tid_;
1498 ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
1499 &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
James Kuszmaul890c2492022-04-06 14:59:31 -07001500 node_, tid, options));
Austin Schuh58646e22021-08-23 23:51:46 -07001501 result->set_name(name);
1502 result->set_send_delay(factory_->send_delay());
Austin Schuh48205e62021-11-12 14:13:18 -08001503 if (skip_timing_report_) {
1504 result->SkipTimingReport();
1505 }
Austin Schuh58646e22021-08-23 23:51:46 -07001506
1507 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1508 << monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
1509 return std::move(result);
1510}
1511
Austin Schuhe33c08d2022-02-03 18:15:21 -08001512void SimulatedEventLoopFactory::AllowApplicationCreationDuring(
1513 std::function<void()> fn) {
1514 scheduler_scheduler_.TemporarilyStopAndRun(std::move(fn));
1515}
1516
Austin Schuh58646e22021-08-23 23:51:46 -07001517void NodeEventLoopFactory::Disconnect(const Node *other) {
1518 factory_->bridge_->Disconnect(node_, other);
1519}
1520
1521void NodeEventLoopFactory::Connect(const Node *other) {
1522 factory_->bridge_->Connect(node_, other);
1523}
1524
Alex Perrycb7da4b2019-08-28 19:35:56 -07001525} // namespace aos