blob: 2a276e7b238221da9889bf0818e9a480379faa59 [file] [log] [blame]
Alex Perrycb7da4b2019-08-28 19:35:56 -07001#include "aos/events/simulated_event_loop.h"
2
3#include <algorithm>
4#include <deque>
milind1f1dca32021-07-03 13:50:07 -07005#include <optional>
6#include <queue>
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08007#include <string_view>
Brian Silverman661eb8d2020-08-12 19:41:01 -07008#include <vector>
Alex Perrycb7da4b2019-08-28 19:35:56 -07009
10#include "absl/container/btree_map.h"
Brian Silverman661eb8d2020-08-12 19:41:01 -070011#include "aos/events/aos_logging.h"
Austin Schuh898f4972020-01-11 17:21:25 -080012#include "aos/events/simulated_network_bridge.h"
Austin Schuh094d09b2020-11-20 23:26:52 -080013#include "aos/init.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070014#include "aos/json_to_flatbuffer.h"
Austin Schuhcc6070c2020-10-10 20:25:56 -070015#include "aos/realtime.h"
Alex Perrycb7da4b2019-08-28 19:35:56 -070016#include "aos/util/phased_loop.h"
17
18namespace aos {
19
Brian Silverman661eb8d2020-08-12 19:41:01 -070020class SimulatedEventLoop;
21class SimulatedFetcher;
22class SimulatedChannel;
23
24namespace {
25
Austin Schuh057d29f2021-08-21 23:05:15 -070026std::string NodeName(const Node *node) {
27 if (node == nullptr) {
28 return "";
29 }
30
31 return absl::StrCat(node->name()->string_view(), " ");
32}
33
Austin Schuhcc6070c2020-10-10 20:25:56 -070034class ScopedMarkRealtimeRestorer {
35 public:
36 ScopedMarkRealtimeRestorer(bool rt) : rt_(rt), prior_(MarkRealtime(rt)) {}
37 ~ScopedMarkRealtimeRestorer() { CHECK_EQ(rt_, MarkRealtime(prior_)); }
38
39 private:
40 const bool rt_;
41 const bool prior_;
42};
43
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070044// Holds storage for a span object and the data referenced by that span for
45// compatibility with RawSender::SharedSpan users. If constructed with
46// MakeSharedSpan, span points to only the aligned segment of the entire data.
47struct AlignedOwningSpan {
48 AlignedOwningSpan(const AlignedOwningSpan &) = delete;
49 AlignedOwningSpan &operator=(const AlignedOwningSpan &) = delete;
50 absl::Span<const uint8_t> span;
51 char data[];
52};
53
54// Constructs a span which owns its data through a shared_ptr. The owning span
55// points to a const view of the data; also returns a temporary mutable span
56// which is only valid while the const shared span is kept alive.
57std::pair<RawSender::SharedSpan, absl::Span<uint8_t>> MakeSharedSpan(
58 size_t size) {
59 AlignedOwningSpan *const span = reinterpret_cast<AlignedOwningSpan *>(
60 malloc(sizeof(AlignedOwningSpan) + size + kChannelDataAlignment - 1));
61
62 absl::Span mutable_span(
63 reinterpret_cast<uint8_t *>(RoundChannelData(&span->data[0], size)),
64 size);
65 new (span) AlignedOwningSpan{.span = mutable_span};
66
67 return std::make_pair(
68 RawSender::SharedSpan(
69 std::shared_ptr<AlignedOwningSpan>(span,
70 [](AlignedOwningSpan *s) {
71 s->~AlignedOwningSpan();
72 free(s);
73 }),
74 &span->span),
75 mutable_span);
76}
77
Alex Perrycb7da4b2019-08-28 19:35:56 -070078// Container for both a message, and the context for it for simulation. This
79// makes tracking the timestamps associated with the data easy.
Brian Silverman661eb8d2020-08-12 19:41:01 -070080struct SimulatedMessage final {
81 SimulatedMessage(const SimulatedMessage &) = delete;
82 SimulatedMessage &operator=(const SimulatedMessage &) = delete;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070083 ~SimulatedMessage();
Brian Silverman661eb8d2020-08-12 19:41:01 -070084
85 // Creates a SimulatedMessage with size bytes of storage.
86 // This is a shared_ptr so we don't have to implement refcounting or copying.
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070087 static std::shared_ptr<SimulatedMessage> Make(
88 SimulatedChannel *channel, const RawSender::SharedSpan data);
Brian Silverman661eb8d2020-08-12 19:41:01 -070089
Alex Perrycb7da4b2019-08-28 19:35:56 -070090 // Context for the data.
91 Context context;
92
Brian Silverman661eb8d2020-08-12 19:41:01 -070093 SimulatedChannel *const channel = nullptr;
Brian Silverman661eb8d2020-08-12 19:41:01 -070094
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070095 // Owning span to this message's data. Depending on the sender may either
96 // represent the data of just the flatbuffer, or max channel size.
97 RawSender::SharedSpan data;
Alex Perrycb7da4b2019-08-28 19:35:56 -070098
Tyler Chatowb7c6eba2021-07-28 14:43:23 -070099 // Mutable view of above data. If empty, this message is not mutable.
100 absl::Span<uint8_t> mutable_data;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700101
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700102 // Determines whether this message is mutable. Used for Send where the user
103 // fills out a message stored internally then gives us the size of data used.
104 bool is_mutable() const { return data->size() == mutable_data.size(); }
105
106 // Note: this should be private but make_shared requires it to be public. Use
107 // Make() above to construct.
Brian Silverman661eb8d2020-08-12 19:41:01 -0700108 SimulatedMessage(SimulatedChannel *channel_in);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700109};
110
Brian Silverman661eb8d2020-08-12 19:41:01 -0700111} // namespace
Austin Schuh39788ff2019-12-01 18:22:57 -0800112
Brian Silverman661eb8d2020-08-12 19:41:01 -0700113// TODO(Brian): This should be in the anonymous namespace, but that annoys GCC
114// for some reason...
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800115class SimulatedWatcher : public WatcherState, public EventScheduler::Event {
Austin Schuh39788ff2019-12-01 18:22:57 -0800116 public:
Austin Schuh7d87b672019-12-01 20:23:49 -0800117 SimulatedWatcher(
118 SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
119 const Channel *channel,
120 std::function<void(const Context &context, const void *message)> fn);
Austin Schuh39788ff2019-12-01 18:22:57 -0800121
Austin Schuh7d87b672019-12-01 20:23:49 -0800122 ~SimulatedWatcher() override;
Austin Schuh39788ff2019-12-01 18:22:57 -0800123
Austin Schuh8fb315a2020-11-19 22:33:58 -0800124 bool has_run() const;
125
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800126 void Handle() noexcept override;
127
Austin Schuh39788ff2019-12-01 18:22:57 -0800128 void Startup(EventLoop * /*event_loop*/) override {}
129
Austin Schuh7d87b672019-12-01 20:23:49 -0800130 void Schedule(std::shared_ptr<SimulatedMessage> message);
131
Austin Schuhf4b09c72021-12-08 12:04:37 -0800132 void HandleEvent() noexcept;
Austin Schuh39788ff2019-12-01 18:22:57 -0800133
134 void SetSimulatedChannel(SimulatedChannel *channel) {
135 simulated_channel_ = channel;
136 }
137
138 private:
Austin Schuh7d87b672019-12-01 20:23:49 -0800139 void DoSchedule(monotonic_clock::time_point event_time);
140
141 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
142
Brian Silverman4f4e0612020-08-12 19:54:41 -0700143 SimulatedEventLoop *const simulated_event_loop_;
144 const Channel *const channel_;
145 EventScheduler *const scheduler_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800146 EventHandler<SimulatedWatcher> event_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800147 EventScheduler::Token token_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800148 SimulatedChannel *simulated_channel_ = nullptr;
149};
Alex Perrycb7da4b2019-08-28 19:35:56 -0700150
151class SimulatedChannel {
152 public:
Austin Schuh8fb315a2020-11-19 22:33:58 -0800153 explicit SimulatedChannel(const Channel *channel,
Brian Silverman661eb8d2020-08-12 19:41:01 -0700154 std::chrono::nanoseconds channel_storage_duration)
Austin Schuh39788ff2019-12-01 18:22:57 -0800155 : channel_(channel),
Brian Silverman661eb8d2020-08-12 19:41:01 -0700156 channel_storage_duration_(channel_storage_duration),
157 next_queue_index_(ipc_lib::QueueIndex::Zero(number_buffers())) {
Brian Silvermanbc596c62021-10-15 14:04:54 -0700158 available_buffer_indices_.resize(number_buffers());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700159 for (int i = 0; i < number_buffers(); ++i) {
Brian Silvermanbc596c62021-10-15 14:04:54 -0700160 available_buffer_indices_[i] = i;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700161 }
162 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700163
Brian Silverman661eb8d2020-08-12 19:41:01 -0700164 ~SimulatedChannel() {
165 latest_message_.reset();
166 CHECK_EQ(static_cast<size_t>(number_buffers()),
167 available_buffer_indices_.size());
James Kuszmaul4f106fb2021-01-05 20:53:02 -0800168 CHECK_EQ(0u, fetchers_.size())
169 << configuration::StrippedChannelToString(channel());
170 CHECK_EQ(0u, watchers_.size())
171 << configuration::StrippedChannelToString(channel());
172 CHECK_EQ(0, sender_count_)
173 << configuration::StrippedChannelToString(channel());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700174 }
175
176 // The number of messages we pretend to have in the queue.
177 int queue_size() const {
178 return channel()->frequency() *
179 std::chrono::duration_cast<std::chrono::duration<double>>(
180 channel_storage_duration_)
181 .count();
182 }
183
milind1f1dca32021-07-03 13:50:07 -0700184 std::chrono::nanoseconds channel_storage_duration() const {
185 return channel_storage_duration_;
186 }
187
Brian Silverman661eb8d2020-08-12 19:41:01 -0700188 // The number of extra buffers (beyond the queue) we pretend to have.
189 int number_scratch_buffers() const {
190 // We need to start creating messages before we know how many
191 // senders+readers we'll have, so we need to just pick something which is
192 // always big enough.
193 return 50;
194 }
195
196 int number_buffers() const { return queue_size() + number_scratch_buffers(); }
197
198 int GetBufferIndex() {
199 CHECK(!available_buffer_indices_.empty()) << ": This should be impossible";
200 const int result = available_buffer_indices_.back();
201 available_buffer_indices_.pop_back();
202 return result;
203 }
204
205 void FreeBufferIndex(int i) {
Austin Schuhc5047ea2021-03-20 22:00:21 -0700206 // This extra checking has a large performance hit with sanitizers that
207 // track memory accesses, so just skip it.
208#if !__has_feature(memory_sanitizer) && !__has_feature(address_sanitizer)
Brian Silverman661eb8d2020-08-12 19:41:01 -0700209 DCHECK(std::find(available_buffer_indices_.begin(),
210 available_buffer_indices_.end(),
211 i) == available_buffer_indices_.end())
212 << ": Buffer is not in use: " << i;
Brian Silvermanf3e6df22021-01-19 15:02:21 -0800213#endif
Brian Silverman661eb8d2020-08-12 19:41:01 -0700214 available_buffer_indices_.push_back(i);
215 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700216
217 // Makes a connected raw sender which calls Send below.
Austin Schuh8fb315a2020-11-19 22:33:58 -0800218 ::std::unique_ptr<RawSender> MakeRawSender(SimulatedEventLoop *event_loop);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700219
220 // Makes a connected raw fetcher.
Austin Schuh39788ff2019-12-01 18:22:57 -0800221 ::std::unique_ptr<RawFetcher> MakeRawFetcher(EventLoop *event_loop);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700222
223 // Registers a watcher for the queue.
Austin Schuh7d87b672019-12-01 20:23:49 -0800224 void MakeRawWatcher(SimulatedWatcher *watcher);
Austin Schuh39788ff2019-12-01 18:22:57 -0800225
Austin Schuh7d87b672019-12-01 20:23:49 -0800226 void RemoveWatcher(SimulatedWatcher *watcher) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800227 watchers_.erase(std::find(watchers_.begin(), watchers_.end(), watcher));
228 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700229
Austin Schuhad154822019-12-27 15:45:13 -0800230 // Sends the message to all the connected receivers and fetchers. Returns the
milind1f1dca32021-07-03 13:50:07 -0700231 // sent queue index, or std::nullopt if messages were sent too fast.
232 std::optional<uint32_t> Send(std::shared_ptr<SimulatedMessage> message);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700233
234 // Unregisters a fetcher.
235 void UnregisterFetcher(SimulatedFetcher *fetcher);
236
237 std::shared_ptr<SimulatedMessage> latest_message() { return latest_message_; }
238
Austin Schuh39788ff2019-12-01 18:22:57 -0800239 size_t max_size() const { return channel()->max_size(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700240
Austin Schuh5f1cc5c2019-12-01 18:01:11 -0800241 const std::string_view name() const {
Austin Schuh39788ff2019-12-01 18:22:57 -0800242 return channel()->name()->string_view();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700243 }
244
Austin Schuh39788ff2019-12-01 18:22:57 -0800245 const Channel *channel() const { return channel_; }
246
Austin Schuhe516ab02020-05-06 21:37:04 -0700247 void CountSenderCreated() {
Brian Silverman661eb8d2020-08-12 19:41:01 -0700248 CheckBufferCount();
Austin Schuhe516ab02020-05-06 21:37:04 -0700249 if (sender_count_ >= channel()->num_senders()) {
250 LOG(FATAL) << "Failed to create sender on "
251 << configuration::CleanedChannelToString(channel())
252 << ", too many senders.";
253 }
254 ++sender_count_;
255 }
Brian Silverman77162972020-08-12 19:52:40 -0700256
Austin Schuhe516ab02020-05-06 21:37:04 -0700257 void CountSenderDestroyed() {
258 --sender_count_;
259 CHECK_GE(sender_count_, 0);
260 }
261
Alex Perrycb7da4b2019-08-28 19:35:56 -0700262 private:
Brian Silverman77162972020-08-12 19:52:40 -0700263 void CheckBufferCount() {
264 int reader_count = 0;
265 if (channel()->read_method() == ReadMethod::PIN) {
266 reader_count = watchers_.size() + fetchers_.size();
267 }
268 CHECK_LT(reader_count + sender_count_, number_scratch_buffers());
269 }
270
271 void CheckReaderCount() {
272 if (channel()->read_method() != ReadMethod::PIN) {
273 return;
274 }
275 CheckBufferCount();
276 const int reader_count = watchers_.size() + fetchers_.size();
277 if (reader_count >= channel()->num_readers()) {
278 LOG(FATAL) << "Failed to create reader on "
279 << configuration::CleanedChannelToString(channel())
280 << ", too many readers.";
281 }
282 }
Brian Silverman661eb8d2020-08-12 19:41:01 -0700283
284 const Channel *const channel_;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700285 const std::chrono::nanoseconds channel_storage_duration_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700286
287 // List of all watchers.
Austin Schuh7d87b672019-12-01 20:23:49 -0800288 ::std::vector<SimulatedWatcher *> watchers_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700289
290 // List of all fetchers.
291 ::std::vector<SimulatedFetcher *> fetchers_;
292 std::shared_ptr<SimulatedMessage> latest_message_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700293
294 ipc_lib::QueueIndex next_queue_index_;
Austin Schuhe516ab02020-05-06 21:37:04 -0700295
296 int sender_count_ = 0;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700297
298 std::vector<uint16_t> available_buffer_indices_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700299};
300
301namespace {
302
Brian Silverman661eb8d2020-08-12 19:41:01 -0700303std::shared_ptr<SimulatedMessage> SimulatedMessage::Make(
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700304 SimulatedChannel *channel, RawSender::SharedSpan data) {
Austin Schuh62288252020-11-18 23:26:04 -0800305 // The allocations in here are due to infrastructure and don't count in the no
306 // mallocs in RT code.
307 ScopedNotRealtime nrt;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700308
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700309 auto message = std::make_shared<SimulatedMessage>(channel);
310 message->context.size = data->size();
311 message->context.data = data->data();
312 message->data = std::move(data);
313
314 return message;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700315}
316
317SimulatedMessage::SimulatedMessage(SimulatedChannel *channel_in)
318 : channel(channel_in) {
Brian Silverman4f4e0612020-08-12 19:54:41 -0700319 context.buffer_index = channel->GetBufferIndex();
Brian Silverman661eb8d2020-08-12 19:41:01 -0700320}
321
322SimulatedMessage::~SimulatedMessage() {
Brian Silverman4f4e0612020-08-12 19:54:41 -0700323 channel->FreeBufferIndex(context.buffer_index);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700324}
325
326class SimulatedSender : public RawSender {
327 public:
Austin Schuh8fb315a2020-11-19 22:33:58 -0800328 SimulatedSender(SimulatedChannel *simulated_channel,
329 SimulatedEventLoop *event_loop);
330 ~SimulatedSender() override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700331
332 void *data() override {
333 if (!message_) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700334 auto [span, mutable_span] =
335 MakeSharedSpan(simulated_channel_->max_size());
336 message_ = SimulatedMessage::Make(simulated_channel_, span);
337 message_->mutable_data = mutable_span;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700338 }
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700339 CHECK(message_->is_mutable());
340 return message_->mutable_data.data();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700341 }
342
343 size_t size() override { return simulated_channel_->max_size(); }
344
milind1f1dca32021-07-03 13:50:07 -0700345 Error DoSend(size_t length, monotonic_clock::time_point monotonic_remote_time,
346 realtime_clock::time_point realtime_remote_time,
347 uint32_t remote_queue_index,
348 const UUID &source_boot_uuid) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700349
milind1f1dca32021-07-03 13:50:07 -0700350 Error DoSend(const void *msg, size_t size,
351 monotonic_clock::time_point monotonic_remote_time,
352 realtime_clock::time_point realtime_remote_time,
353 uint32_t remote_queue_index,
354 const UUID &source_boot_uuid) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700355
milind1f1dca32021-07-03 13:50:07 -0700356 Error DoSend(const SharedSpan data,
357 aos::monotonic_clock::time_point monotonic_remote_time,
358 aos::realtime_clock::time_point realtime_remote_time,
359 uint32_t remote_queue_index,
360 const UUID &source_boot_uuid) override;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700361
Brian Silverman4f4e0612020-08-12 19:54:41 -0700362 int buffer_index() override {
363 // First, ensure message_ is allocated.
364 data();
365 return message_->context.buffer_index;
366 }
367
Alex Perrycb7da4b2019-08-28 19:35:56 -0700368 private:
369 SimulatedChannel *simulated_channel_;
Austin Schuh58646e22021-08-23 23:51:46 -0700370 SimulatedEventLoop *simulated_event_loop_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700371
372 std::shared_ptr<SimulatedMessage> message_;
373};
374} // namespace
375
376class SimulatedFetcher : public RawFetcher {
377 public:
Austin Schuhac0771c2020-01-07 18:36:30 -0800378 explicit SimulatedFetcher(EventLoop *event_loop,
379 SimulatedChannel *simulated_channel)
380 : RawFetcher(event_loop, simulated_channel->channel()),
381 simulated_channel_(simulated_channel) {}
382 ~SimulatedFetcher() { simulated_channel_->UnregisterFetcher(this); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700383
Austin Schuh39788ff2019-12-01 18:22:57 -0800384 std::pair<bool, monotonic_clock::time_point> DoFetchNext() override {
Austin Schuh62288252020-11-18 23:26:04 -0800385 // The allocations in here are due to infrastructure and don't count in the
386 // no mallocs in RT code.
387 ScopedNotRealtime nrt;
Austin Schuh39788ff2019-12-01 18:22:57 -0800388 if (msgs_.size() == 0) {
389 return std::make_pair(false, monotonic_clock::min_time);
390 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700391
James Kuszmaulbcd96fc2020-10-12 20:29:32 -0700392 CHECK(!fell_behind_) << ": Got behind on "
393 << configuration::StrippedChannelToString(
394 simulated_channel_->channel());
Brian Silverman661eb8d2020-08-12 19:41:01 -0700395
Alex Perrycb7da4b2019-08-28 19:35:56 -0700396 SetMsg(msgs_.front());
397 msgs_.pop_front();
Austin Schuha5e14192020-01-06 18:02:41 -0800398 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700399 }
400
Austin Schuh39788ff2019-12-01 18:22:57 -0800401 std::pair<bool, monotonic_clock::time_point> DoFetch() override {
Austin Schuh62288252020-11-18 23:26:04 -0800402 // The allocations in here are due to infrastructure and don't count in the
403 // no mallocs in RT code.
404 ScopedNotRealtime nrt;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700405 if (msgs_.size() == 0) {
Austin Schuh7d87b672019-12-01 20:23:49 -0800406 // TODO(austin): Can we just do this logic unconditionally? It is a lot
407 // simpler. And call clear, obviously.
Austin Schuhac0771c2020-01-07 18:36:30 -0800408 if (!msg_ && simulated_channel_->latest_message()) {
409 SetMsg(simulated_channel_->latest_message());
Austin Schuha5e14192020-01-06 18:02:41 -0800410 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700411 } else {
Austin Schuh39788ff2019-12-01 18:22:57 -0800412 return std::make_pair(false, monotonic_clock::min_time);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700413 }
414 }
415
416 // We've had a message enqueued, so we don't need to go looking for the
417 // latest message from before we started.
418 SetMsg(msgs_.back());
419 msgs_.clear();
Brian Silverman661eb8d2020-08-12 19:41:01 -0700420 fell_behind_ = false;
Austin Schuha5e14192020-01-06 18:02:41 -0800421 return std::make_pair(true, event_loop()->monotonic_now());
Alex Perrycb7da4b2019-08-28 19:35:56 -0700422 }
423
424 private:
425 friend class SimulatedChannel;
426
427 // Updates the state inside RawFetcher to point to the data in msg_.
428 void SetMsg(std::shared_ptr<SimulatedMessage> msg) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800429 msg_ = std::move(msg);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700430 context_ = msg_->context;
Brian Silverman4f4e0612020-08-12 19:54:41 -0700431 if (channel()->read_method() != ReadMethod::PIN) {
432 context_.buffer_index = -1;
433 }
Austin Schuhad154822019-12-27 15:45:13 -0800434 if (context_.remote_queue_index == 0xffffffffu) {
435 context_.remote_queue_index = context_.queue_index;
436 }
Austin Schuh58646e22021-08-23 23:51:46 -0700437 if (context_.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800438 context_.monotonic_remote_time = context_.monotonic_event_time;
439 }
Austin Schuh58646e22021-08-23 23:51:46 -0700440 if (context_.realtime_remote_time == realtime_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800441 context_.realtime_remote_time = context_.realtime_event_time;
442 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700443 }
444
445 // Internal method for Simulation to add a message to the buffer.
446 void Enqueue(std::shared_ptr<SimulatedMessage> buffer) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800447 msgs_.emplace_back(std::move(buffer));
Brian Silverman661eb8d2020-08-12 19:41:01 -0700448 if (fell_behind_ ||
449 msgs_.size() > static_cast<size_t>(simulated_channel_->queue_size())) {
450 fell_behind_ = true;
451 // Might as well empty out all the intermediate messages now.
452 while (msgs_.size() > 1) {
453 msgs_.pop_front();
454 }
455 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700456 }
457
Austin Schuhac0771c2020-01-07 18:36:30 -0800458 SimulatedChannel *simulated_channel_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700459 std::shared_ptr<SimulatedMessage> msg_;
460
461 // Messages queued up but not in use.
462 ::std::deque<std::shared_ptr<SimulatedMessage>> msgs_;
Brian Silverman661eb8d2020-08-12 19:41:01 -0700463
464 // Whether we're currently "behind", which means a FetchNext call will fail.
465 bool fell_behind_ = false;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700466};
467
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800468class SimulatedTimerHandler : public TimerHandler,
469 public EventScheduler::Event {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700470 public:
471 explicit SimulatedTimerHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800472 SimulatedEventLoop *simulated_event_loop,
Austin Schuh39788ff2019-12-01 18:22:57 -0800473 ::std::function<void()> fn);
Austin Schuh7d87b672019-12-01 20:23:49 -0800474 ~SimulatedTimerHandler() { Disable(); }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700475
476 void Setup(monotonic_clock::time_point base,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800477 monotonic_clock::duration repeat_offset) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700478
Austin Schuhf4b09c72021-12-08 12:04:37 -0800479 void HandleEvent() noexcept;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700480
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800481 void Handle() noexcept override;
482
Austin Schuh7d87b672019-12-01 20:23:49 -0800483 void Disable() override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700484
Alex Perrycb7da4b2019-08-28 19:35:56 -0700485 private:
Austin Schuh7d87b672019-12-01 20:23:49 -0800486 SimulatedEventLoop *simulated_event_loop_;
487 EventHandler<SimulatedTimerHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700488 EventScheduler *scheduler_;
489 EventScheduler::Token token_;
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800490
Alex Perrycb7da4b2019-08-28 19:35:56 -0700491 monotonic_clock::time_point base_;
492 monotonic_clock::duration repeat_offset_;
493};
494
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800495class SimulatedPhasedLoopHandler : public PhasedLoopHandler,
496 public EventScheduler::Event {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700497 public:
498 SimulatedPhasedLoopHandler(EventScheduler *scheduler,
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800499 SimulatedEventLoop *simulated_event_loop,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700500 ::std::function<void(int)> fn,
501 const monotonic_clock::duration interval,
Austin Schuh39788ff2019-12-01 18:22:57 -0800502 const monotonic_clock::duration offset);
Austin Schuh7d87b672019-12-01 20:23:49 -0800503 ~SimulatedPhasedLoopHandler();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700504
Austin Schuhf4b09c72021-12-08 12:04:37 -0800505 void HandleEvent() noexcept;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700506
Austin Schuh7d87b672019-12-01 20:23:49 -0800507 void Schedule(monotonic_clock::time_point sleep_time) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700508
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800509 void Handle() noexcept override;
510
Alex Perrycb7da4b2019-08-28 19:35:56 -0700511 private:
Austin Schuh39788ff2019-12-01 18:22:57 -0800512 SimulatedEventLoop *simulated_event_loop_;
Austin Schuh7d87b672019-12-01 20:23:49 -0800513 EventHandler<SimulatedPhasedLoopHandler> event_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700514
Austin Schuh39788ff2019-12-01 18:22:57 -0800515 EventScheduler *scheduler_;
516 EventScheduler::Token token_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700517};
518
519class SimulatedEventLoop : public EventLoop {
520 public:
521 explicit SimulatedEventLoop(
Brian Silverman661eb8d2020-08-12 19:41:01 -0700522 EventScheduler *scheduler, NodeEventLoopFactory *node_event_loop_factory,
Alex Perrycb7da4b2019-08-28 19:35:56 -0700523 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>>
524 *channels,
525 const Configuration *configuration,
Austin Schuh057d29f2021-08-21 23:05:15 -0700526 std::vector<SimulatedEventLoop *> *event_loops_, const Node *node,
527 pid_t tid)
Austin Schuh83c7f702021-01-19 22:36:29 -0800528 : EventLoop(CHECK_NOTNULL(configuration)),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700529 scheduler_(scheduler),
Austin Schuhac0771c2020-01-07 18:36:30 -0800530 node_event_loop_factory_(node_event_loop_factory),
Alex Perrycb7da4b2019-08-28 19:35:56 -0700531 channels_(channels),
Austin Schuh057d29f2021-08-21 23:05:15 -0700532 event_loops_(event_loops_),
Austin Schuh217a9782019-12-21 23:02:50 -0800533 node_(node),
Austin Schuh58646e22021-08-23 23:51:46 -0700534 tid_(tid),
535 startup_tracker_(std::make_shared<StartupTracker>()) {
536 startup_tracker_->loop = this;
537 scheduler_->ScheduleOnStartup([startup_tracker = startup_tracker_]() {
538 if (startup_tracker->loop) {
539 startup_tracker->loop->Setup();
540 startup_tracker->has_setup = true;
541 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700542 });
543
544 event_loops_->push_back(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700545 }
Austin Schuh58646e22021-08-23 23:51:46 -0700546
Alex Perrycb7da4b2019-08-28 19:35:56 -0700547 ~SimulatedEventLoop() override {
Austin Schuh39788ff2019-12-01 18:22:57 -0800548 // Trigger any remaining senders or fetchers to be cleared before destroying
549 // the event loop so the book keeping matches.
550 timing_report_sender_.reset();
551
552 // Force everything with a registered fd with epoll to be destroyed now.
553 timers_.clear();
554 phased_loops_.clear();
555 watchers_.clear();
556
Austin Schuh58646e22021-08-23 23:51:46 -0700557 for (auto it = event_loops_->begin(); it != event_loops_->end(); ++it) {
Austin Schuh057d29f2021-08-21 23:05:15 -0700558 if (*it == this) {
559 event_loops_->erase(it);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700560 break;
561 }
562 }
Austin Schuh58646e22021-08-23 23:51:46 -0700563 VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
564 << monotonic_now() << " ~SimulatedEventLoop(\"" << name_ << "\")";
565 startup_tracker_->loop = nullptr;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700566 }
567
Austin Schuh057d29f2021-08-21 23:05:15 -0700568 void SetIsRunning(bool running) {
Austin Schuh58646e22021-08-23 23:51:46 -0700569 VLOG(1) << scheduler_->distributed_now() << " " << NodeName(node())
570 << monotonic_now() << " " << name_ << " set_is_running(" << running
571 << ")";
572 CHECK(startup_tracker_->has_setup);
Austin Schuh057d29f2021-08-21 23:05:15 -0700573
574 set_is_running(running);
Austin Schuh58646e22021-08-23 23:51:46 -0700575 if (running) {
576 has_run_ = true;
577 }
Austin Schuh057d29f2021-08-21 23:05:15 -0700578 }
579
Austin Schuh8fb315a2020-11-19 22:33:58 -0800580 bool has_run() const { return has_run_; }
581
Austin Schuh7d87b672019-12-01 20:23:49 -0800582 std::chrono::nanoseconds send_delay() const { return send_delay_; }
583 void set_send_delay(std::chrono::nanoseconds send_delay) {
584 send_delay_ = send_delay;
585 }
586
Austin Schuh58646e22021-08-23 23:51:46 -0700587 monotonic_clock::time_point monotonic_now() override {
Austin Schuhac0771c2020-01-07 18:36:30 -0800588 return node_event_loop_factory_->monotonic_now();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700589 }
590
Austin Schuh58646e22021-08-23 23:51:46 -0700591 realtime_clock::time_point realtime_now() override {
Austin Schuhac0771c2020-01-07 18:36:30 -0800592 return node_event_loop_factory_->realtime_now();
Alex Perrycb7da4b2019-08-28 19:35:56 -0700593 }
594
Austin Schuh58646e22021-08-23 23:51:46 -0700595 distributed_clock::time_point distributed_now() {
596 return scheduler_->distributed_now();
597 }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700598
Austin Schuh58646e22021-08-23 23:51:46 -0700599 std::unique_ptr<RawSender> MakeRawSender(const Channel *channel) override;
600
601 std::unique_ptr<RawFetcher> MakeRawFetcher(const Channel *channel) override;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700602
603 void MakeRawWatcher(
604 const Channel *channel,
605 ::std::function<void(const Context &context, const void *message)>
606 watcher) override;
607
608 TimerHandler *AddTimer(::std::function<void()> callback) override {
Austin Schuh39788ff2019-12-01 18:22:57 -0800609 CHECK(!is_running());
Austin Schuh8bd96322020-02-13 21:18:22 -0800610 return NewTimer(::std::unique_ptr<TimerHandler>(
611 new SimulatedTimerHandler(scheduler_, this, callback)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700612 }
613
614 PhasedLoopHandler *AddPhasedLoop(::std::function<void(int)> callback,
615 const monotonic_clock::duration interval,
616 const monotonic_clock::duration offset =
617 ::std::chrono::seconds(0)) override {
Austin Schuh8bd96322020-02-13 21:18:22 -0800618 return NewPhasedLoop(
619 ::std::unique_ptr<PhasedLoopHandler>(new SimulatedPhasedLoopHandler(
620 scheduler_, this, callback, interval, offset)));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700621 }
622
623 void OnRun(::std::function<void()> on_run) override {
Austin Schuh8fb315a2020-11-19 22:33:58 -0800624 CHECK(!is_running()) << ": Cannot register OnRun callback while running.";
Austin Schuhcc6070c2020-10-10 20:25:56 -0700625 scheduler_->ScheduleOnRun([this, on_run = std::move(on_run)]() {
Austin Schuhad9e5eb2021-11-19 20:33:55 -0800626 logging::ScopedLogRestorer prev_logger;
627 if (log_impl_) {
628 prev_logger.Swap(log_impl_);
629 }
Austin Schuhcc6070c2020-10-10 20:25:56 -0700630 ScopedMarkRealtimeRestorer rt(priority() > 0);
Austin Schuha9012be2021-07-21 15:19:11 -0700631 SetTimerContext(monotonic_now());
Austin Schuhcc6070c2020-10-10 20:25:56 -0700632 on_run();
633 });
Alex Perrycb7da4b2019-08-28 19:35:56 -0700634 }
635
Austin Schuh217a9782019-12-21 23:02:50 -0800636 const Node *node() const override { return node_; }
637
James Kuszmaul3ae42262019-11-08 12:33:41 -0800638 void set_name(const std::string_view name) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700639 name_ = std::string(name);
640 }
James Kuszmaul3ae42262019-11-08 12:33:41 -0800641 const std::string_view name() const override { return name_; }
Alex Perrycb7da4b2019-08-28 19:35:56 -0700642
643 SimulatedChannel *GetSimulatedChannel(const Channel *channel);
644
Austin Schuh39788ff2019-12-01 18:22:57 -0800645 void SetRuntimeRealtimePriority(int priority) override {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700646 CHECK(!is_running()) << ": Cannot set realtime priority while running.";
Austin Schuh39788ff2019-12-01 18:22:57 -0800647 priority_ = priority;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700648 }
649
Austin Schuh39788ff2019-12-01 18:22:57 -0800650 int priority() const override { return priority_; }
651
Brian Silverman6a54ff32020-04-28 16:41:39 -0700652 void SetRuntimeAffinity(const cpu_set_t & /*cpuset*/) override {
653 CHECK(!is_running()) << ": Cannot set affinity while running.";
654 }
655
Tyler Chatow67ddb032020-01-12 14:30:04 -0800656 void Setup() {
657 MaybeScheduleTimingReports();
658 if (!skip_logger_) {
Austin Schuhad9e5eb2021-11-19 20:33:55 -0800659 log_sender_.Initialize(&name_,
660 MakeSender<logging::LogMessageFbs>("/aos"));
Austin Schuha0c41ba2020-09-10 22:59:14 -0700661 log_impl_ = log_sender_.implementation();
Tyler Chatow67ddb032020-01-12 14:30:04 -0800662 }
663 }
Austin Schuh39788ff2019-12-01 18:22:57 -0800664
Brian Silverman4f4e0612020-08-12 19:54:41 -0700665 int NumberBuffers(const Channel *channel) override;
666
Austin Schuh83c7f702021-01-19 22:36:29 -0800667 const UUID &boot_uuid() const override {
668 return node_event_loop_factory_->boot_uuid();
669 }
670
Alex Perrycb7da4b2019-08-28 19:35:56 -0700671 private:
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800672 friend class SimulatedTimerHandler;
Austin Schuh7d87b672019-12-01 20:23:49 -0800673 friend class SimulatedPhasedLoopHandler;
674 friend class SimulatedWatcher;
675
Austin Schuh58646e22021-08-23 23:51:46 -0700676 // We have a condition where we register a startup handler, but then get shut
677 // down before it runs. This results in a segfault if we are lucky, and
678 // corruption otherwise. To handle that, allocate a small object which points
679 // back to us and can be freed when the function is freed. That object can
680 // then be updated when we get destroyed so setup is not called.
681 struct StartupTracker {
682 SimulatedEventLoop *loop = nullptr;
683 bool has_setup = false;
684 };
685
Austin Schuh7d87b672019-12-01 20:23:49 -0800686 void HandleEvent() {
687 while (true) {
688 if (EventCount() == 0 || PeekEvent()->event_time() > monotonic_now()) {
689 break;
690 }
691
692 EventLoopEvent *event = PopEvent();
693 event->HandleEvent();
694 }
695 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -0800696
Austin Schuh39788ff2019-12-01 18:22:57 -0800697 pid_t GetTid() override { return tid_; }
698
Alex Perrycb7da4b2019-08-28 19:35:56 -0700699 EventScheduler *scheduler_;
Austin Schuhac0771c2020-01-07 18:36:30 -0800700 NodeEventLoopFactory *node_event_loop_factory_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700701 absl::btree_map<SimpleChannel, std::unique_ptr<SimulatedChannel>> *channels_;
Austin Schuh057d29f2021-08-21 23:05:15 -0700702 std::vector<SimulatedEventLoop *> *event_loops_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700703
704 ::std::string name_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800705
706 int priority_ = 0;
707
Austin Schuh7d87b672019-12-01 20:23:49 -0800708 std::chrono::nanoseconds send_delay_;
709
Austin Schuh217a9782019-12-21 23:02:50 -0800710 const Node *const node_;
Austin Schuh39788ff2019-12-01 18:22:57 -0800711 const pid_t tid_;
Tyler Chatow67ddb032020-01-12 14:30:04 -0800712
713 AosLogToFbs log_sender_;
Austin Schuha0c41ba2020-09-10 22:59:14 -0700714 std::shared_ptr<logging::LogImplementation> log_impl_ = nullptr;
Austin Schuh8fb315a2020-11-19 22:33:58 -0800715
716 bool has_run_ = false;
Austin Schuh58646e22021-08-23 23:51:46 -0700717
718 std::shared_ptr<StartupTracker> startup_tracker_;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700719};
720
Austin Schuh7d87b672019-12-01 20:23:49 -0800721void SimulatedEventLoopFactory::set_send_delay(
722 std::chrono::nanoseconds send_delay) {
723 send_delay_ = send_delay;
Austin Schuh58646e22021-08-23 23:51:46 -0700724 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
Austin Schuh057d29f2021-08-21 23:05:15 -0700725 if (node) {
726 for (SimulatedEventLoop *loop : node->event_loops_) {
727 loop->set_send_delay(send_delay_);
728 }
729 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800730 }
731}
732
Alex Perrycb7da4b2019-08-28 19:35:56 -0700733void SimulatedEventLoop::MakeRawWatcher(
734 const Channel *channel,
735 std::function<void(const Context &channel, const void *message)> watcher) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800736 TakeWatcher(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800737
Austin Schuh057d29f2021-08-21 23:05:15 -0700738 std::unique_ptr<SimulatedWatcher> shm_watcher =
739 std::make_unique<SimulatedWatcher>(this, scheduler_, channel,
740 std::move(watcher));
Austin Schuh39788ff2019-12-01 18:22:57 -0800741
742 GetSimulatedChannel(channel)->MakeRawWatcher(shm_watcher.get());
Austin Schuh057d29f2021-08-21 23:05:15 -0700743
Austin Schuh39788ff2019-12-01 18:22:57 -0800744 NewWatcher(std::move(shm_watcher));
Austin Schuh58646e22021-08-23 23:51:46 -0700745 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
746 << " " << name() << " MakeRawWatcher(\""
747 << configuration::StrippedChannelToString(channel) << "\")";
Austin Schuh8fb315a2020-11-19 22:33:58 -0800748
749 // Order of operations gets kinda wonky if we let people make watchers after
750 // running once. If someone has a valid use case, we can reconsider.
751 CHECK(!has_run()) << ": Can't add a watcher after running.";
Alex Perrycb7da4b2019-08-28 19:35:56 -0700752}
753
754std::unique_ptr<RawSender> SimulatedEventLoop::MakeRawSender(
755 const Channel *channel) {
Brian Silverman0fc69932020-01-24 21:54:02 -0800756 TakeSender(channel);
757
Austin Schuh58646e22021-08-23 23:51:46 -0700758 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
759 << " " << name() << " MakeRawSender(\""
760 << configuration::StrippedChannelToString(channel) << "\")";
Alex Perrycb7da4b2019-08-28 19:35:56 -0700761 return GetSimulatedChannel(channel)->MakeRawSender(this);
762}
763
764std::unique_ptr<RawFetcher> SimulatedEventLoop::MakeRawFetcher(
765 const Channel *channel) {
Austin Schuh39788ff2019-12-01 18:22:57 -0800766 ChannelIndex(channel);
Austin Schuh217a9782019-12-21 23:02:50 -0800767
Austin Schuhca4828c2019-12-28 14:21:35 -0800768 if (!configuration::ChannelIsReadableOnNode(channel, node())) {
769 LOG(FATAL) << "Channel { \"name\": \"" << channel->name()->string_view()
770 << "\", \"type\": \"" << channel->type()->string_view()
771 << "\" } is not able to be fetched on this node. Check your "
772 "configuration.";
Austin Schuh217a9782019-12-21 23:02:50 -0800773 }
774
Austin Schuh58646e22021-08-23 23:51:46 -0700775 VLOG(1) << distributed_now() << " " << NodeName(node()) << monotonic_now()
776 << " " << name() << " MakeRawFetcher(\""
777 << configuration::StrippedChannelToString(channel) << "\")";
Austin Schuh39788ff2019-12-01 18:22:57 -0800778 return GetSimulatedChannel(channel)->MakeRawFetcher(this);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700779}
780
781SimulatedChannel *SimulatedEventLoop::GetSimulatedChannel(
782 const Channel *channel) {
783 auto it = channels_->find(SimpleChannel(channel));
784 if (it == channels_->end()) {
Austin Schuh8fb315a2020-11-19 22:33:58 -0800785 it =
786 channels_
787 ->emplace(
788 SimpleChannel(channel),
789 std::unique_ptr<SimulatedChannel>(new SimulatedChannel(
790 channel, std::chrono::nanoseconds(
791 configuration()->channel_storage_duration()))))
792 .first;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700793 }
794 return it->second.get();
795}
796
Brian Silverman4f4e0612020-08-12 19:54:41 -0700797int SimulatedEventLoop::NumberBuffers(const Channel *channel) {
798 return GetSimulatedChannel(channel)->number_buffers();
799}
800
Austin Schuh7d87b672019-12-01 20:23:49 -0800801SimulatedWatcher::SimulatedWatcher(
802 SimulatedEventLoop *simulated_event_loop, EventScheduler *scheduler,
Austin Schuh8bd96322020-02-13 21:18:22 -0800803 const Channel *channel,
Austin Schuh7d87b672019-12-01 20:23:49 -0800804 std::function<void(const Context &context, const void *message)> fn)
805 : WatcherState(simulated_event_loop, channel, std::move(fn)),
806 simulated_event_loop_(simulated_event_loop),
Brian Silverman4f4e0612020-08-12 19:54:41 -0700807 channel_(channel),
Austin Schuh7d87b672019-12-01 20:23:49 -0800808 scheduler_(scheduler),
Brian Silverman4f4e0612020-08-12 19:54:41 -0700809 event_(this),
Austin Schuh58646e22021-08-23 23:51:46 -0700810 token_(scheduler_->InvalidToken()) {
811 VLOG(1) << simulated_event_loop_->distributed_now() << " "
812 << NodeName(simulated_event_loop_->node())
813 << simulated_event_loop_->monotonic_now() << " "
814 << simulated_event_loop_->name() << " Watching "
815 << configuration::StrippedChannelToString(channel_);
816}
Austin Schuh7d87b672019-12-01 20:23:49 -0800817
818SimulatedWatcher::~SimulatedWatcher() {
Austin Schuh58646e22021-08-23 23:51:46 -0700819 VLOG(1) << simulated_event_loop_->distributed_now() << " "
Austin Schuh057d29f2021-08-21 23:05:15 -0700820 << NodeName(simulated_event_loop_->node())
Austin Schuh58646e22021-08-23 23:51:46 -0700821 << simulated_event_loop_->monotonic_now() << " "
822 << simulated_event_loop_->name() << " ~Watching "
Austin Schuh057d29f2021-08-21 23:05:15 -0700823 << configuration::StrippedChannelToString(channel_);
Austin Schuh7d87b672019-12-01 20:23:49 -0800824 simulated_event_loop_->RemoveEvent(&event_);
825 if (token_ != scheduler_->InvalidToken()) {
826 scheduler_->Deschedule(token_);
827 }
Brian Silverman4f4e0612020-08-12 19:54:41 -0700828 CHECK_NOTNULL(simulated_channel_)->RemoveWatcher(this);
Austin Schuh7d87b672019-12-01 20:23:49 -0800829}
830
Austin Schuh8fb315a2020-11-19 22:33:58 -0800831bool SimulatedWatcher::has_run() const {
832 return simulated_event_loop_->has_run();
833}
834
Austin Schuh7d87b672019-12-01 20:23:49 -0800835void SimulatedWatcher::Schedule(std::shared_ptr<SimulatedMessage> message) {
Austin Schuha5e14192020-01-06 18:02:41 -0800836 monotonic_clock::time_point event_time =
837 simulated_event_loop_->monotonic_now();
Austin Schuh7d87b672019-12-01 20:23:49 -0800838
839 // Messages are queued in order. If we are the first, add ourselves.
840 // Otherwise, don't.
841 if (msgs_.size() == 0) {
Austin Schuhad154822019-12-27 15:45:13 -0800842 event_.set_event_time(message->context.monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800843 simulated_event_loop_->AddEvent(&event_);
844
845 DoSchedule(event_time);
846 }
847
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800848 msgs_.emplace_back(std::move(message));
Austin Schuh7d87b672019-12-01 20:23:49 -0800849}
850
Austin Schuhf4b09c72021-12-08 12:04:37 -0800851void SimulatedWatcher::HandleEvent() noexcept {
Austin Schuh7d87b672019-12-01 20:23:49 -0800852 const monotonic_clock::time_point monotonic_now =
853 simulated_event_loop_->monotonic_now();
Austin Schuh58646e22021-08-23 23:51:46 -0700854 VLOG(1) << simulated_event_loop_->distributed_now() << " "
855 << NodeName(simulated_event_loop_->node())
856 << simulated_event_loop_->monotonic_now() << " "
857 << simulated_event_loop_->name() << " Watcher "
Austin Schuh057d29f2021-08-21 23:05:15 -0700858 << configuration::StrippedChannelToString(channel_);
859 CHECK_NE(msgs_.size(), 0u) << ": No events to handle.";
860
Tyler Chatow67ddb032020-01-12 14:30:04 -0800861 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -0700862 if (simulated_event_loop_->log_impl_) {
863 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -0800864 }
Austin Schuhad154822019-12-27 15:45:13 -0800865 Context context = msgs_.front()->context;
866
Brian Silverman4f4e0612020-08-12 19:54:41 -0700867 if (channel_->read_method() != ReadMethod::PIN) {
868 context.buffer_index = -1;
869 }
Austin Schuhad154822019-12-27 15:45:13 -0800870 if (context.remote_queue_index == 0xffffffffu) {
871 context.remote_queue_index = context.queue_index;
872 }
Austin Schuh58646e22021-08-23 23:51:46 -0700873 if (context.monotonic_remote_time == monotonic_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800874 context.monotonic_remote_time = context.monotonic_event_time;
875 }
Austin Schuh58646e22021-08-23 23:51:46 -0700876 if (context.realtime_remote_time == realtime_clock::min_time) {
Austin Schuhad154822019-12-27 15:45:13 -0800877 context.realtime_remote_time = context.realtime_event_time;
878 }
879
Austin Schuhcc6070c2020-10-10 20:25:56 -0700880 {
881 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
882 DoCallCallback([monotonic_now]() { return monotonic_now; }, context);
883 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800884
885 msgs_.pop_front();
Austin Schuheb4e4ce2020-09-10 23:04:18 -0700886 if (token_ != scheduler_->InvalidToken()) {
887 scheduler_->Deschedule(token_);
888 token_ = scheduler_->InvalidToken();
889 }
Austin Schuh7d87b672019-12-01 20:23:49 -0800890 if (msgs_.size() != 0) {
Austin Schuhad154822019-12-27 15:45:13 -0800891 event_.set_event_time(msgs_.front()->context.monotonic_event_time);
Austin Schuh7d87b672019-12-01 20:23:49 -0800892 simulated_event_loop_->AddEvent(&event_);
893
894 DoSchedule(event_.event_time());
Austin Schuh7d87b672019-12-01 20:23:49 -0800895 }
896}
897
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800898void SimulatedWatcher::Handle() noexcept {
899 DCHECK(token_ != scheduler_->InvalidToken());
900 token_ = scheduler_->InvalidToken();
901 simulated_event_loop_->HandleEvent();
902}
903
Austin Schuh7d87b672019-12-01 20:23:49 -0800904void SimulatedWatcher::DoSchedule(monotonic_clock::time_point event_time) {
Austin Schuheb4e4ce2020-09-10 23:04:18 -0700905 CHECK(token_ == scheduler_->InvalidToken())
906 << ": May not schedule multiple times";
907 token_ = scheduler_->Schedule(
Austin Schuhef8f1ae2021-12-11 12:35:05 -0800908 event_time + simulated_event_loop_->send_delay(), this);
Austin Schuh7d87b672019-12-01 20:23:49 -0800909}
910
911void SimulatedChannel::MakeRawWatcher(SimulatedWatcher *watcher) {
Brian Silverman77162972020-08-12 19:52:40 -0700912 CheckReaderCount();
Austin Schuh39788ff2019-12-01 18:22:57 -0800913 watcher->SetSimulatedChannel(this);
914 watchers_.emplace_back(watcher);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700915}
916
917::std::unique_ptr<RawSender> SimulatedChannel::MakeRawSender(
Austin Schuh8fb315a2020-11-19 22:33:58 -0800918 SimulatedEventLoop *event_loop) {
Alex Perrycb7da4b2019-08-28 19:35:56 -0700919 return ::std::unique_ptr<RawSender>(new SimulatedSender(this, event_loop));
920}
921
Austin Schuh39788ff2019-12-01 18:22:57 -0800922::std::unique_ptr<RawFetcher> SimulatedChannel::MakeRawFetcher(
923 EventLoop *event_loop) {
Brian Silverman77162972020-08-12 19:52:40 -0700924 CheckReaderCount();
Austin Schuh39788ff2019-12-01 18:22:57 -0800925 ::std::unique_ptr<SimulatedFetcher> fetcher(
926 new SimulatedFetcher(event_loop, this));
Alex Perrycb7da4b2019-08-28 19:35:56 -0700927 fetchers_.push_back(fetcher.get());
928 return ::std::move(fetcher);
929}
930
milind1f1dca32021-07-03 13:50:07 -0700931std::optional<uint32_t> SimulatedChannel::Send(
932 std::shared_ptr<SimulatedMessage> message) {
933 std::optional<uint32_t> queue_index = {next_queue_index_.index()};
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700934
milind1f1dca32021-07-03 13:50:07 -0700935 message->context.queue_index = *queue_index;
Tyler Chatowb7c6eba2021-07-28 14:43:23 -0700936 // Points to the actual data depending on the size set in context. Data may
937 // allocate more than the actual size of the message, so offset from the back
938 // of that to get the actual start of the data.
939 message->context.data =
940 message->data->data() + message->data->size() - message->context.size;
Austin Schuha9df9ad2021-06-16 14:49:39 -0700941
942 DCHECK(channel()->has_schema())
943 << ": Missing schema for channel "
944 << configuration::StrippedChannelToString(channel());
945 DCHECK(flatbuffers::Verify(
946 *channel()->schema(), *channel()->schema()->root_table(),
947 static_cast<const uint8_t *>(message->context.data),
948 message->context.size))
949 << ": Corrupted flatbuffer on " << channel()->name()->c_str() << " "
950 << channel()->type()->c_str();
951
Alex Perrycb7da4b2019-08-28 19:35:56 -0700952 next_queue_index_ = next_queue_index_.Increment();
953
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800954 latest_message_ = std::move(message);
Austin Schuh8fb315a2020-11-19 22:33:58 -0800955 for (SimulatedWatcher *watcher : watchers_) {
956 if (watcher->has_run()) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800957 watcher->Schedule(latest_message_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700958 }
959 }
960 for (auto &fetcher : fetchers_) {
Austin Schuhe6f4c8d2021-12-11 12:36:06 -0800961 fetcher->Enqueue(latest_message_);
Alex Perrycb7da4b2019-08-28 19:35:56 -0700962 }
Austin Schuhad154822019-12-27 15:45:13 -0800963 return queue_index;
Alex Perrycb7da4b2019-08-28 19:35:56 -0700964}
965
966void SimulatedChannel::UnregisterFetcher(SimulatedFetcher *fetcher) {
967 fetchers_.erase(::std::find(fetchers_.begin(), fetchers_.end(), fetcher));
968}
969
Austin Schuh8fb315a2020-11-19 22:33:58 -0800970SimulatedSender::SimulatedSender(SimulatedChannel *simulated_channel,
971 SimulatedEventLoop *event_loop)
972 : RawSender(event_loop, simulated_channel->channel()),
973 simulated_channel_(simulated_channel),
Austin Schuh58646e22021-08-23 23:51:46 -0700974 simulated_event_loop_(event_loop) {
Austin Schuh8fb315a2020-11-19 22:33:58 -0800975 simulated_channel_->CountSenderCreated();
976}
977
978SimulatedSender::~SimulatedSender() {
979 simulated_channel_->CountSenderDestroyed();
980}
981
milind1f1dca32021-07-03 13:50:07 -0700982RawSender::Error SimulatedSender::DoSend(
983 size_t length, monotonic_clock::time_point monotonic_remote_time,
984 realtime_clock::time_point realtime_remote_time,
985 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Austin Schuh58646e22021-08-23 23:51:46 -0700986 VLOG(1) << simulated_event_loop_->distributed_now() << " "
987 << NodeName(simulated_event_loop_->node())
988 << simulated_event_loop_->monotonic_now() << " "
989 << simulated_event_loop_->name() << " Send "
990 << configuration::StrippedChannelToString(channel());
991
Austin Schuh8fb315a2020-11-19 22:33:58 -0800992 // The allocations in here are due to infrastructure and don't count in the
993 // no mallocs in RT code.
994 ScopedNotRealtime nrt;
995 CHECK_LE(length, size()) << ": Attempting to send too big a message.";
Austin Schuh58646e22021-08-23 23:51:46 -0700996 message_->context.monotonic_event_time =
997 simulated_event_loop_->monotonic_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -0800998 message_->context.monotonic_remote_time = monotonic_remote_time;
999 message_->context.remote_queue_index = remote_queue_index;
Austin Schuh58646e22021-08-23 23:51:46 -07001000 message_->context.realtime_event_time = simulated_event_loop_->realtime_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001001 message_->context.realtime_remote_time = realtime_remote_time;
Austin Schuha9012be2021-07-21 15:19:11 -07001002 message_->context.source_boot_uuid = source_boot_uuid;
Austin Schuh8fb315a2020-11-19 22:33:58 -08001003 CHECK_LE(length, message_->context.size);
1004 message_->context.size = length;
1005
milind1f1dca32021-07-03 13:50:07 -07001006 const std::optional<uint32_t> optional_queue_index =
1007 simulated_channel_->Send(message_);
1008
1009 // Check that we are not sending messages too fast
1010 if (!optional_queue_index) {
1011 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1012 << NodeName(simulated_event_loop_->node())
1013 << simulated_event_loop_->monotonic_now() << " "
1014 << simulated_event_loop_->name()
1015 << "\nMessages were sent too fast:\n"
1016 << "For channel: "
1017 << configuration::CleanedChannelToString(
1018 simulated_channel_->channel())
1019 << '\n'
1020 << "Tried to send more than " << simulated_channel_->queue_size()
1021 << " (queue size) messages in the last "
1022 << std::chrono::duration<double>(
1023 simulated_channel_->channel_storage_duration())
1024 .count()
1025 << " seconds (channel storage duration)"
1026 << "\n\n";
1027 return Error::kMessagesSentTooFast;
1028 }
1029
1030 sent_queue_index_ = *optional_queue_index;
Austin Schuh58646e22021-08-23 23:51:46 -07001031 monotonic_sent_time_ = simulated_event_loop_->monotonic_now();
1032 realtime_sent_time_ = simulated_event_loop_->realtime_now();
Austin Schuh8fb315a2020-11-19 22:33:58 -08001033
1034 // Drop the reference to the message so that we allocate a new message for
1035 // next time. Otherwise we will continue to reuse the same memory for all
1036 // messages and corrupt it.
1037 message_.reset();
milind1f1dca32021-07-03 13:50:07 -07001038 return Error::kOk;
Austin Schuh8fb315a2020-11-19 22:33:58 -08001039}
1040
milind1f1dca32021-07-03 13:50:07 -07001041RawSender::Error SimulatedSender::DoSend(
1042 const void *msg, size_t size,
1043 monotonic_clock::time_point monotonic_remote_time,
1044 realtime_clock::time_point realtime_remote_time,
1045 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Austin Schuh102667e2020-12-11 20:13:28 -08001046 CHECK_LE(size, this->size())
1047 << ": Attempting to send too big a message on "
1048 << configuration::CleanedChannelToString(simulated_channel_->channel());
Austin Schuh8fb315a2020-11-19 22:33:58 -08001049
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001050 // Allocates an aligned buffer in which to copy unaligned msg.
1051 auto [span, mutable_span] = MakeSharedSpan(size);
1052 message_ = SimulatedMessage::Make(simulated_channel_, span);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001053
1054 // Now fill in the message. size is already populated above, and
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001055 // queue_index will be populated in simulated_channel_.
1056 memcpy(mutable_span.data(), msg, size);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001057
1058 return DoSend(size, monotonic_remote_time, realtime_remote_time,
Austin Schuha9012be2021-07-21 15:19:11 -07001059 remote_queue_index, source_boot_uuid);
Austin Schuh8fb315a2020-11-19 22:33:58 -08001060}
1061
milind1f1dca32021-07-03 13:50:07 -07001062RawSender::Error SimulatedSender::DoSend(
1063 const RawSender::SharedSpan data,
1064 monotonic_clock::time_point monotonic_remote_time,
1065 realtime_clock::time_point realtime_remote_time,
1066 uint32_t remote_queue_index, const UUID &source_boot_uuid) {
Tyler Chatowb7c6eba2021-07-28 14:43:23 -07001067 CHECK_LE(data->size(), this->size())
1068 << ": Attempting to send too big a message on "
1069 << configuration::CleanedChannelToString(simulated_channel_->channel());
1070
1071 // Constructs a message sharing the already allocated and aligned message
1072 // data.
1073 message_ = SimulatedMessage::Make(simulated_channel_, data);
1074
1075 return DoSend(data->size(), monotonic_remote_time, realtime_remote_time,
1076 remote_queue_index, source_boot_uuid);
1077}
1078
Austin Schuh39788ff2019-12-01 18:22:57 -08001079SimulatedTimerHandler::SimulatedTimerHandler(
Austin Schuh8bd96322020-02-13 21:18:22 -08001080 EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
1081 ::std::function<void()> fn)
Austin Schuh39788ff2019-12-01 18:22:57 -08001082 : TimerHandler(simulated_event_loop, std::move(fn)),
Austin Schuh7d87b672019-12-01 20:23:49 -08001083 simulated_event_loop_(simulated_event_loop),
1084 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -08001085 scheduler_(scheduler),
1086 token_(scheduler_->InvalidToken()) {}
1087
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001088void SimulatedTimerHandler::Setup(monotonic_clock::time_point base,
1089 monotonic_clock::duration repeat_offset) {
Austin Schuh62288252020-11-18 23:26:04 -08001090 // The allocations in here are due to infrastructure and don't count in the no
1091 // mallocs in RT code.
1092 ScopedNotRealtime nrt;
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001093 Disable();
Austin Schuh58646e22021-08-23 23:51:46 -07001094 const monotonic_clock::time_point monotonic_now =
Austin Schuha5e14192020-01-06 18:02:41 -08001095 simulated_event_loop_->monotonic_now();
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001096 base_ = base;
1097 repeat_offset_ = repeat_offset;
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001098 token_ = scheduler_->Schedule(std::max(base, monotonic_now), this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001099 event_.set_event_time(base_);
1100 simulated_event_loop_->AddEvent(&event_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001101}
1102
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001103void SimulatedTimerHandler::Handle() noexcept {
1104 DCHECK(token_ != scheduler_->InvalidToken());
1105 token_ = scheduler_->InvalidToken();
1106 simulated_event_loop_->HandleEvent();
1107}
1108
Austin Schuhf4b09c72021-12-08 12:04:37 -08001109void SimulatedTimerHandler::HandleEvent() noexcept {
Austin Schuh58646e22021-08-23 23:51:46 -07001110 const monotonic_clock::time_point monotonic_now =
Austin Schuha5e14192020-01-06 18:02:41 -08001111 simulated_event_loop_->monotonic_now();
Austin Schuh58646e22021-08-23 23:51:46 -07001112 VLOG(1) << simulated_event_loop_->distributed_now() << " "
1113 << NodeName(simulated_event_loop_->node()) << monotonic_now << " "
1114 << simulated_event_loop_->name() << " Timer '" << name() << "'";
Tyler Chatow67ddb032020-01-12 14:30:04 -08001115 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -07001116 if (simulated_event_loop_->log_impl_) {
1117 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -08001118 }
Austin Schuheb4e4ce2020-09-10 23:04:18 -07001119 if (token_ != scheduler_->InvalidToken()) {
1120 scheduler_->Deschedule(token_);
1121 token_ = scheduler_->InvalidToken();
1122 }
Austin Schuh58646e22021-08-23 23:51:46 -07001123 if (repeat_offset_ != monotonic_clock::zero()) {
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001124 // Reschedule.
1125 while (base_ <= monotonic_now) base_ += repeat_offset_;
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001126 token_ = scheduler_->Schedule(base_, this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001127 event_.set_event_time(base_);
1128 simulated_event_loop_->AddEvent(&event_);
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001129 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001130
Austin Schuhcc6070c2020-10-10 20:25:56 -07001131 {
1132 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
1133 Call([monotonic_now]() { return monotonic_now; }, monotonic_now);
1134 }
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001135}
1136
Austin Schuh7d87b672019-12-01 20:23:49 -08001137void SimulatedTimerHandler::Disable() {
1138 simulated_event_loop_->RemoveEvent(&event_);
1139 if (token_ != scheduler_->InvalidToken()) {
1140 scheduler_->Deschedule(token_);
1141 token_ = scheduler_->InvalidToken();
1142 }
1143}
1144
Austin Schuh39788ff2019-12-01 18:22:57 -08001145SimulatedPhasedLoopHandler::SimulatedPhasedLoopHandler(
Austin Schuh8bd96322020-02-13 21:18:22 -08001146 EventScheduler *scheduler, SimulatedEventLoop *simulated_event_loop,
1147 ::std::function<void(int)> fn, const monotonic_clock::duration interval,
Austin Schuh39788ff2019-12-01 18:22:57 -08001148 const monotonic_clock::duration offset)
1149 : PhasedLoopHandler(simulated_event_loop, std::move(fn), interval, offset),
1150 simulated_event_loop_(simulated_event_loop),
Austin Schuh7d87b672019-12-01 20:23:49 -08001151 event_(this),
Austin Schuh39788ff2019-12-01 18:22:57 -08001152 scheduler_(scheduler),
1153 token_(scheduler_->InvalidToken()) {}
1154
Austin Schuh7d87b672019-12-01 20:23:49 -08001155SimulatedPhasedLoopHandler::~SimulatedPhasedLoopHandler() {
1156 if (token_ != scheduler_->InvalidToken()) {
1157 scheduler_->Deschedule(token_);
1158 token_ = scheduler_->InvalidToken();
1159 }
1160 simulated_event_loop_->RemoveEvent(&event_);
1161}
1162
Austin Schuhf4b09c72021-12-08 12:04:37 -08001163void SimulatedPhasedLoopHandler::HandleEvent() noexcept {
Austin Schuh39788ff2019-12-01 18:22:57 -08001164 monotonic_clock::time_point monotonic_now =
1165 simulated_event_loop_->monotonic_now();
Austin Schuh057d29f2021-08-21 23:05:15 -07001166 VLOG(1) << monotonic_now << " Phased loop " << simulated_event_loop_->name()
1167 << ", " << name();
Tyler Chatow67ddb032020-01-12 14:30:04 -08001168 logging::ScopedLogRestorer prev_logger;
Austin Schuha0c41ba2020-09-10 22:59:14 -07001169 if (simulated_event_loop_->log_impl_) {
1170 prev_logger.Swap(simulated_event_loop_->log_impl_);
Tyler Chatow67ddb032020-01-12 14:30:04 -08001171 }
Austin Schuhcc6070c2020-10-10 20:25:56 -07001172
1173 {
1174 ScopedMarkRealtimeRestorer rt(simulated_event_loop_->priority() > 0);
1175 Call([monotonic_now]() { return monotonic_now; },
1176 [this](monotonic_clock::time_point sleep_time) {
1177 Schedule(sleep_time);
1178 });
1179 }
Austin Schuh39788ff2019-12-01 18:22:57 -08001180}
Austin Schuhde8a8ff2019-11-30 15:25:36 -08001181
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001182void SimulatedPhasedLoopHandler::Handle() noexcept {
1183 DCHECK(token_ != scheduler_->InvalidToken());
1184 token_ = scheduler_->InvalidToken();
1185 simulated_event_loop_->HandleEvent();
1186}
1187
Austin Schuh7d87b672019-12-01 20:23:49 -08001188void SimulatedPhasedLoopHandler::Schedule(
1189 monotonic_clock::time_point sleep_time) {
Austin Schuh62288252020-11-18 23:26:04 -08001190 // The allocations in here are due to infrastructure and don't count in the no
1191 // mallocs in RT code.
1192 ScopedNotRealtime nrt;
Austin Schuheb4e4ce2020-09-10 23:04:18 -07001193 if (token_ != scheduler_->InvalidToken()) {
1194 scheduler_->Deschedule(token_);
1195 token_ = scheduler_->InvalidToken();
1196 }
Austin Schuhef8f1ae2021-12-11 12:35:05 -08001197 token_ = scheduler_->Schedule(sleep_time, this);
Austin Schuh7d87b672019-12-01 20:23:49 -08001198 event_.set_event_time(sleep_time);
1199 simulated_event_loop_->AddEvent(&event_);
1200}
1201
Alex Perrycb7da4b2019-08-28 19:35:56 -07001202SimulatedEventLoopFactory::SimulatedEventLoopFactory(
1203 const Configuration *configuration)
Austin Schuh6f3babe2020-01-26 20:34:50 -08001204 : configuration_(CHECK_NOTNULL(configuration)),
1205 nodes_(configuration::GetNodes(configuration_)) {
Austin Schuh094d09b2020-11-20 23:26:52 -08001206 CHECK(IsInitialized()) << ": Need to initialize AOS first.";
Austin Schuhac0771c2020-01-07 18:36:30 -08001207 for (const Node *node : nodes_) {
Austin Schuh58646e22021-08-23 23:51:46 -07001208 node_factories_.emplace_back(
1209 new NodeEventLoopFactory(&scheduler_scheduler_, this, node));
Austin Schuh15649d62019-12-28 16:36:38 -08001210 }
Austin Schuh898f4972020-01-11 17:21:25 -08001211
1212 if (configuration::MultiNode(configuration)) {
1213 bridge_ = std::make_unique<message_bridge::SimulatedMessageBridge>(this);
1214 }
Austin Schuh15649d62019-12-28 16:36:38 -08001215}
1216
Alex Perrycb7da4b2019-08-28 19:35:56 -07001217SimulatedEventLoopFactory::~SimulatedEventLoopFactory() {}
1218
Austin Schuhac0771c2020-01-07 18:36:30 -08001219NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
Austin Schuh057d29f2021-08-21 23:05:15 -07001220 std::string_view node) {
1221 return GetNodeEventLoopFactory(configuration::GetNode(configuration(), node));
1222}
1223
1224NodeEventLoopFactory *SimulatedEventLoopFactory::GetNodeEventLoopFactory(
Austin Schuhac0771c2020-01-07 18:36:30 -08001225 const Node *node) {
1226 auto result = std::find_if(
1227 node_factories_.begin(), node_factories_.end(),
1228 [node](const std::unique_ptr<NodeEventLoopFactory> &node_factory) {
1229 return node_factory->node() == node;
1230 });
1231
1232 CHECK(result != node_factories_.end())
1233 << ": Failed to find node " << FlatbufferToJson(node);
1234
1235 return result->get();
1236}
1237
Austin Schuh87dd3832021-01-01 23:07:31 -08001238void SimulatedEventLoopFactory::SetTimeConverter(
1239 TimeConverter *time_converter) {
1240 for (std::unique_ptr<NodeEventLoopFactory> &factory : node_factories_) {
1241 factory->SetTimeConverter(time_converter);
1242 }
Austin Schuh58646e22021-08-23 23:51:46 -07001243 scheduler_scheduler_.SetTimeConverter(time_converter);
Austin Schuh87dd3832021-01-01 23:07:31 -08001244}
1245
Austin Schuh5f1cc5c2019-12-01 18:01:11 -08001246::std::unique_ptr<EventLoop> SimulatedEventLoopFactory::MakeEventLoop(
Austin Schuhac0771c2020-01-07 18:36:30 -08001247 std::string_view name, const Node *node) {
1248 if (node == nullptr) {
1249 CHECK(!configuration::MultiNode(configuration()))
1250 << ": Can't make a single node event loop in a multi-node world.";
1251 } else {
1252 CHECK(configuration::MultiNode(configuration()))
1253 << ": Can't make a multi-node event loop in a single-node world.";
1254 }
1255 return GetNodeEventLoopFactory(node)->MakeEventLoop(name);
1256}
1257
Austin Schuh057d29f2021-08-21 23:05:15 -07001258NodeEventLoopFactory::NodeEventLoopFactory(
1259 EventSchedulerScheduler *scheduler_scheduler,
1260 SimulatedEventLoopFactory *factory, const Node *node)
Austin Schuh58646e22021-08-23 23:51:46 -07001261 : scheduler_(configuration::GetNodeIndex(factory->configuration(), node)),
1262 factory_(factory),
1263 node_(node) {
Austin Schuh057d29f2021-08-21 23:05:15 -07001264 scheduler_scheduler->AddEventScheduler(&scheduler_);
Austin Schuh58646e22021-08-23 23:51:46 -07001265 scheduler_.set_started([this]() {
1266 started_ = true;
1267 for (SimulatedEventLoop *event_loop : event_loops_) {
1268 event_loop->SetIsRunning(true);
1269 }
1270 });
1271 scheduler_.set_on_shutdown([this]() {
1272 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
1273 << monotonic_now() << " Shutting down node.";
1274 Shutdown();
1275 ScheduleStartup();
1276 });
1277 ScheduleStartup();
Austin Schuh057d29f2021-08-21 23:05:15 -07001278}
1279
1280NodeEventLoopFactory::~NodeEventLoopFactory() {
Austin Schuh58646e22021-08-23 23:51:46 -07001281 if (started_) {
1282 for (std::function<void()> &fn : on_shutdown_) {
1283 fn();
1284 }
1285
1286 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1287 << monotonic_now() << " Shutting down applications.";
1288 applications_.clear();
1289 started_ = false;
1290 }
1291
1292 if (event_loops_.size() != 0u) {
1293 for (SimulatedEventLoop *event_loop : event_loops_) {
1294 LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
1295 << monotonic_now() << " Event loop '" << event_loop->name()
1296 << "' failed to shut down";
1297 }
1298 }
Austin Schuh057d29f2021-08-21 23:05:15 -07001299 CHECK_EQ(event_loops_.size(), 0u) << "Event loop didn't exit";
1300}
1301
Austin Schuh58646e22021-08-23 23:51:46 -07001302void NodeEventLoopFactory::OnStartup(std::function<void()> &&fn) {
Austin Schuh8bd96322020-02-13 21:18:22 -08001303 CHECK(!scheduler_.is_running())
Austin Schuh58646e22021-08-23 23:51:46 -07001304 << ": Can only register OnStartup handlers when not running.";
1305 on_startup_.emplace_back(std::move(fn));
1306 if (started_) {
1307 size_t on_startup_index = on_startup_.size() - 1;
1308 scheduler_.ScheduleOnStartup(
1309 [this, on_startup_index]() { on_startup_[on_startup_index](); });
1310 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001311}
1312
Austin Schuh58646e22021-08-23 23:51:46 -07001313void NodeEventLoopFactory::OnShutdown(std::function<void()> &&fn) {
1314 on_shutdown_.emplace_back(std::move(fn));
Austin Schuhc0b0f722020-12-12 18:36:06 -08001315}
Austin Schuh057d29f2021-08-21 23:05:15 -07001316
Austin Schuh58646e22021-08-23 23:51:46 -07001317void NodeEventLoopFactory::ScheduleStartup() {
1318 scheduler_.ScheduleOnStartup([this]() {
1319 UUID next_uuid = scheduler_.boot_uuid();
1320 if (boot_uuid_ != next_uuid) {
Austin Schuh188a2f62021-11-08 10:45:54 -08001321 CHECK_EQ(boot_uuid_, UUID::Zero())
1322 << ": Boot UUID changed without restarting. Did TimeConverter "
1323 "change the boot UUID without signaling a restart, or did you "
1324 "change TimeConverter?";
Austin Schuh58646e22021-08-23 23:51:46 -07001325 boot_uuid_ = next_uuid;
1326 }
1327 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(this->node())
1328 << monotonic_now() << " Starting up node on boot " << boot_uuid_;
1329 Startup();
1330 });
1331}
1332
1333void NodeEventLoopFactory::Startup() {
1334 CHECK(!started_);
1335 for (size_t i = 0; i < on_startup_.size(); ++i) {
1336 on_startup_[i]();
1337 }
1338}
1339
1340void NodeEventLoopFactory::Shutdown() {
1341 for (SimulatedEventLoop *event_loop : event_loops_) {
1342 event_loop->SetIsRunning(false);
1343 }
1344
1345 CHECK(started_);
1346 started_ = false;
1347 for (std::function<void()> &fn : on_shutdown_) {
1348 fn();
1349 }
1350
1351 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1352 << monotonic_now() << " Shutting down applications.";
1353 applications_.clear();
1354
1355 if (event_loops_.size() != 0u) {
1356 for (SimulatedEventLoop *event_loop : event_loops_) {
1357 LOG(ERROR) << scheduler_.distributed_now() << " " << NodeName(node())
1358 << monotonic_now() << " Event loop '" << event_loop->name()
1359 << "' failed to shut down";
1360 }
1361 }
1362 CHECK_EQ(event_loops_.size(), 0u) << "Not all event loops shut down";
1363 boot_uuid_ = UUID::Zero();
1364
1365 channels_.clear();
Austin Schuhc0b0f722020-12-12 18:36:06 -08001366}
1367
Alex Perrycb7da4b2019-08-28 19:35:56 -07001368void SimulatedEventLoopFactory::RunFor(monotonic_clock::duration duration) {
Austin Schuh58646e22021-08-23 23:51:46 -07001369 // This sets running to true too.
Austin Schuh057d29f2021-08-21 23:05:15 -07001370 scheduler_scheduler_.RunOnStartup();
Austin Schuh8bd96322020-02-13 21:18:22 -08001371 scheduler_scheduler_.RunFor(duration);
Austin Schuh057d29f2021-08-21 23:05:15 -07001372 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1373 if (node) {
1374 for (SimulatedEventLoop *loop : node->event_loops_) {
1375 loop->SetIsRunning(false);
1376 }
1377 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001378 }
1379}
1380
1381void SimulatedEventLoopFactory::Run() {
Austin Schuh58646e22021-08-23 23:51:46 -07001382 // This sets running to true too.
Austin Schuh057d29f2021-08-21 23:05:15 -07001383 scheduler_scheduler_.RunOnStartup();
Austin Schuh8bd96322020-02-13 21:18:22 -08001384 scheduler_scheduler_.Run();
Austin Schuh057d29f2021-08-21 23:05:15 -07001385 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1386 if (node) {
1387 for (SimulatedEventLoop *loop : node->event_loops_) {
1388 loop->SetIsRunning(false);
1389 }
1390 }
Alex Perrycb7da4b2019-08-28 19:35:56 -07001391 }
1392}
1393
Austin Schuh87dd3832021-01-01 23:07:31 -08001394void SimulatedEventLoopFactory::Exit() { scheduler_scheduler_.Exit(); }
Austin Schuh8fb315a2020-11-19 22:33:58 -08001395
Austin Schuh6f3babe2020-01-26 20:34:50 -08001396void SimulatedEventLoopFactory::DisableForwarding(const Channel *channel) {
Austin Schuh4c3b9702020-08-30 11:34:55 -07001397 CHECK(bridge_) << ": Can't disable forwarding without a message bridge.";
Austin Schuh6f3babe2020-01-26 20:34:50 -08001398 bridge_->DisableForwarding(channel);
1399}
1400
Austin Schuh4c3b9702020-08-30 11:34:55 -07001401void SimulatedEventLoopFactory::DisableStatistics() {
1402 CHECK(bridge_) << ": Can't disable statistics without a message bridge.";
1403 bridge_->DisableStatistics();
1404}
1405
Austin Schuh48205e62021-11-12 14:13:18 -08001406void SimulatedEventLoopFactory::EnableStatistics() {
1407 CHECK(bridge_) << ": Can't enable statistics without a message bridge.";
1408 bridge_->EnableStatistics();
1409}
1410
Austin Schuh2928ebe2021-02-07 22:10:27 -08001411void SimulatedEventLoopFactory::SkipTimingReport() {
1412 CHECK(bridge_) << ": Can't skip timing reports without a message bridge.";
Austin Schuh48205e62021-11-12 14:13:18 -08001413
1414 for (std::unique_ptr<NodeEventLoopFactory> &node : node_factories_) {
1415 if (node) {
1416 node->SkipTimingReport();
1417 }
1418 }
1419}
1420
1421void NodeEventLoopFactory::SkipTimingReport() {
1422 for (SimulatedEventLoop *event_loop : event_loops_) {
1423 event_loop->SkipTimingReport();
1424 }
1425 skip_timing_report_ = true;
1426}
1427
1428void NodeEventLoopFactory::EnableStatistics() {
1429 CHECK(factory_->bridge_)
1430 << ": Can't enable statistics without a message bridge.";
1431 factory_->bridge_->EnableStatistics(node_);
1432}
1433
1434void NodeEventLoopFactory::DisableStatistics() {
1435 CHECK(factory_->bridge_)
1436 << ": Can't disable statistics without a message bridge.";
1437 factory_->bridge_->DisableStatistics(node_);
Austin Schuh2928ebe2021-02-07 22:10:27 -08001438}
1439
Austin Schuh58646e22021-08-23 23:51:46 -07001440::std::unique_ptr<EventLoop> NodeEventLoopFactory::MakeEventLoop(
1441 std::string_view name) {
1442 CHECK(!scheduler_.is_running() || !started_)
1443 << ": Can't create an event loop while running";
1444
1445 pid_t tid = tid_;
1446 ++tid_;
1447 ::std::unique_ptr<SimulatedEventLoop> result(new SimulatedEventLoop(
1448 &scheduler_, this, &channels_, factory_->configuration(), &event_loops_,
1449 node_, tid));
1450 result->set_name(name);
1451 result->set_send_delay(factory_->send_delay());
Austin Schuh48205e62021-11-12 14:13:18 -08001452 if (skip_timing_report_) {
1453 result->SkipTimingReport();
1454 }
Austin Schuh58646e22021-08-23 23:51:46 -07001455
1456 VLOG(1) << scheduler_.distributed_now() << " " << NodeName(node())
1457 << monotonic_now() << " MakeEventLoop(\"" << result->name() << "\")";
1458 return std::move(result);
1459}
1460
1461void NodeEventLoopFactory::Disconnect(const Node *other) {
1462 factory_->bridge_->Disconnect(node_, other);
1463}
1464
1465void NodeEventLoopFactory::Connect(const Node *other) {
1466 factory_->bridge_->Connect(node_, other);
1467}
1468
Alex Perrycb7da4b2019-08-28 19:35:56 -07001469} // namespace aos