blob: b4d262a5c59456f7a0e07c1eee6b5204c5ca3960 [file] [log] [blame]
James Kuszmaul0de4feb2022-04-15 12:16:59 -07001#include "aos/util/foxglove_websocket_lib.h"
2
3#include "aos/util/mcap_logger.h"
4#include "gflags/gflags.h"
5
6DEFINE_uint32(sorting_buffer_ms, 100,
7 "Amount of time to buffer messages to sort them before sending "
8 "them to foxglove.");
9DEFINE_bool(fetch_pinned_channels, false,
10 "Set this to allow foxglove_websocket to make fetchers on channels "
11 "with a read_method of PIN (see aos/configuration.fbs; PIN is an "
12 "enum value). By default, we don't make fetchers for "
13 "these channels since using up a fetcher slot on PIN'd channels "
14 "can have side-effects.");
15
16namespace {
17// Period at which to poll the fetchers for all the channels.
18constexpr std::chrono::milliseconds kPollPeriod{50};
19} // namespace
20
21namespace aos {
22FoxgloveWebsocketServer::FoxgloveWebsocketServer(aos::EventLoop *event_loop,
23 uint32_t port)
24 : event_loop_(event_loop), server_(port, "aos_foxglove") {
25 for (const aos::Channel *channel :
26 *event_loop_->configuration()->channels()) {
27 const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
28 if (aos::configuration::ChannelIsReadableOnNode(channel,
29 event_loop_->node()) &&
30 (!is_pinned || FLAGS_fetch_pinned_channels)) {
31 const ChannelId id =
32 server_.addChannel(foxglove::websocket::ChannelWithoutId{
33 .topic = channel->name()->str() + " " + channel->type()->str(),
34 .encoding = "json",
35 .schemaName = channel->type()->str(),
36 .schema = JsonSchemaForFlatbuffer({channel->schema()}).dump()});
37 CHECK(fetchers_.count(id) == 0);
38 fetchers_[id] =
39 FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
40 }
41 }
42
43 server_.setSubscribeHandler([this](ChannelId channel) {
44 if (fetchers_.count(channel) == 0) {
45 return;
46 }
47 if (active_channels_.count(channel) == 0) {
48 // Catch up to the latest message on the requested channel, then subscribe
49 // to it.
50 fetchers_[channel].fetcher->Fetch();
51 active_channels_.insert(channel);
52 }
53 });
54 server_.setUnsubscribeHandler(
55 [this](ChannelId channel) { active_channels_.erase(channel); });
56 aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
57 // In order to run the websocket server, we just let it spin every cycle for
58 // a bit. This isn't great for integration, but lets us stay in control and
59 // until we either have (a) a chance to locate a file descriptor to hand
60 // epoll; or (b) rewrite the foxglove websocket server to use seasocks
61 // (which we know how to integrate), we'll just function with this.
62 // TODO(james): Tighter integration into our event loop structure.
63 server_.run_for(kPollPeriod / 2);
64
65 // Unfortunately, we can't just push out all the messages as they come in.
66 // Foxglove expects that the timestamps associated with each message to be
67 // monotonic, and if you send things out of order then it will clear the
68 // state of the visualization entirely, which makes viewing plots
69 // impossible. If the user only accesses a single channel, that is fine, but
70 // as soon as they try to use multiple channels, you encounter interleaving.
71 // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
72 // out messages older than that time, sorting everything before we send it
73 // out.
74 const aos::monotonic_clock::time_point sort_until =
75 event_loop_->monotonic_now() -
76 std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
77
78 // Pair of <send_time, channel id>.
79 absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
80 fetcher_times;
81
82 // Go through and seed fetcher_times with the first message on each channel.
83 for (const ChannelId channel : active_channels_) {
84 FetcherState *fetcher = &fetchers_[channel];
85 if (fetcher->sent_current_message) {
86 if (fetcher->fetcher->FetchNext()) {
87 fetcher->sent_current_message = false;
88 }
89 }
90 if (!fetcher->sent_current_message) {
91 const aos::monotonic_clock::time_point send_time =
92 fetcher->fetcher->context().monotonic_event_time;
93 if (send_time <= sort_until) {
94 fetcher_times.insert(std::make_pair(send_time, channel));
95 }
96 }
97 }
98
99 // Send the oldest message continually until we run out of messages to send.
100 while (!fetcher_times.empty()) {
101 const ChannelId channel = fetcher_times.begin()->second;
102 FetcherState *fetcher = &fetchers_[channel];
103 server_.sendMessage(
104 channel, fetcher_times.begin()->first.time_since_epoch().count(),
105 aos::FlatbufferToJson(
106 fetcher->fetcher->channel()->schema(),
107 static_cast<const uint8_t *>(fetcher->fetcher->context().data)));
108 fetcher_times.erase(fetcher_times.begin());
109 fetcher->sent_current_message = true;
110 if (fetcher->fetcher->FetchNext()) {
111 fetcher->sent_current_message = false;
112 const aos::monotonic_clock::time_point send_time =
113 fetcher->fetcher->context().monotonic_event_time;
114 if (send_time <= sort_until) {
115 fetcher_times.insert(std::make_pair(send_time, channel));
116 }
117 }
118 }
119 });
120
121 event_loop_->OnRun([timer, this]() {
122 timer->Setup(event_loop_->monotonic_now(), kPollPeriod);
123 });
124}
125FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
126} // namespace aos