blob: 9ee7926f4fb8e3d5671227a7ecf51d707187f0e0 [file] [log] [blame]
Parker Schuhe4a70d62017-12-27 20:10:20 -08001#include <unordered_set>
2#include <vector>
3#include "aos/common/condition.h"
4#include "aos/common/mutex.h"
5#include "aos/events/event-loop.h"
6
7namespace aos {
8namespace internal {
9
10class WatcherThreadState;
11
12} // namespace internal
13
14// Specialization of EventLoop that is build from queues running out of shared
15// memory. See more details at aos/common/queue.h
16class ShmEventLoop : public EventLoop {
17 public:
18 ShmEventLoop();
19 ~ShmEventLoop() override;
20
21 ::aos::monotonic_clock::time_point monotonic_now() override {
22 return ::aos::monotonic_clock::now();
23 }
24
25 std::unique_ptr<RawSender> MakeRawSender(const std::string &path,
26 const QueueTypeInfo &type) override;
27 std::unique_ptr<RawFetcher> MakeRawFetcher(
28 const std::string &path, const QueueTypeInfo &type) override;
29
30 void MakeRawWatcher(
31 const std::string &path, const QueueTypeInfo &type,
32 std::function<void(const aos::Message *message)> watcher) override;
33
34 void OnRun(std::function<void()> on_run) override;
35 void Run() override;
36 void Exit() override;
37
38 private:
39 friend class internal::WatcherThreadState;
40 // This ThreadState ensures that two watchers in the same loop cannot be
41 // triggered concurrently. Because watchers block threads indefinitely, this
42 // has to be shared_ptr in case the EventLoop is destroyed before the thread
43 // receives any new events.
44 class ThreadState {
45 public:
46 void WaitForStart();
47
48 bool is_running() { return loop_running_; }
49
50 void Run();
51
52 void Exit();
53
54 private:
55 friend class internal::WatcherThreadState;
56 friend class ShmEventLoop;
57
58 // This mutex ensures that only one watch event happens at a time.
59 aos::Mutex mutex_;
60 // Block on this until the loop starts.
61 aos::Condition loop_running_cond_{&mutex_};
62 // Used to notify watchers that the loop is done.
63 std::atomic<bool> loop_running_{false};
64 bool loop_finished_ = false;
65 };
66
67 // Exclude multiple of the same type for path.
68 void Take(const std::string &path);
69
70 std::vector<std::function<void()>> on_run_;
71 std::shared_ptr<ThreadState> thread_state_;
72
73 std::unordered_set<std::string> taken_;
74};
75
76} // namespace aos