blob: 1cc3a8adae2f7914897b27e32002176abefba77b [file] [log] [blame]
James Kuszmaul0de4feb2022-04-15 12:16:59 -07001#include "aos/util/foxglove_websocket_lib.h"
2
3#include "aos/util/mcap_logger.h"
James Kuszmaulf1dbaff2023-02-08 21:17:32 -08004#include "aos/flatbuffer_merge.h"
5#include "absl/strings/escaping.h"
James Kuszmaul0de4feb2022-04-15 12:16:59 -07006#include "gflags/gflags.h"
7
8DEFINE_uint32(sorting_buffer_ms, 100,
9 "Amount of time to buffer messages to sort them before sending "
10 "them to foxglove.");
James Kuszmaul0de4feb2022-04-15 12:16:59 -070011
12namespace {
13// Period at which to poll the fetchers for all the channels.
14constexpr std::chrono::milliseconds kPollPeriod{50};
15} // namespace
16
17namespace aos {
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080018FoxgloveWebsocketServer::FoxgloveWebsocketServer(
19 aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
20 FetchPinnedChannels fetch_pinned_channels)
21 : event_loop_(event_loop),
22 serialization_(serialization),
23 fetch_pinned_channels_(fetch_pinned_channels),
24 server_(port, "aos_foxglove") {
James Kuszmaul0de4feb2022-04-15 12:16:59 -070025 for (const aos::Channel *channel :
26 *event_loop_->configuration()->channels()) {
27 const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
28 if (aos::configuration::ChannelIsReadableOnNode(channel,
29 event_loop_->node()) &&
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080030 (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
31 const FlatbufferDetachedBuffer<reflection::Schema> schema =
32 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul0de4feb2022-04-15 12:16:59 -070033 const ChannelId id =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080034 (serialization_ == Serialization::kJson)
35 ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
36 .topic =
37 channel->name()->str() + " " + channel->type()->str(),
38 .encoding = "json",
39 .schemaName = channel->type()->str(),
40 .schema =
41 JsonSchemaForFlatbuffer({channel->schema()}).dump()})
42 : server_.addChannel(foxglove::websocket::ChannelWithoutId{
43 .topic =
44 channel->name()->str() + " " + channel->type()->str(),
45 .encoding = "flatbuffer",
46 .schemaName = channel->type()->str(),
47 .schema = absl::Base64Escape(
48 {reinterpret_cast<const char *>(schema.span().data()),
49 schema.span().size()})});
James Kuszmaul0de4feb2022-04-15 12:16:59 -070050 CHECK(fetchers_.count(id) == 0);
51 fetchers_[id] =
52 FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
53 }
54 }
55
56 server_.setSubscribeHandler([this](ChannelId channel) {
57 if (fetchers_.count(channel) == 0) {
58 return;
59 }
60 if (active_channels_.count(channel) == 0) {
61 // Catch up to the latest message on the requested channel, then subscribe
62 // to it.
63 fetchers_[channel].fetcher->Fetch();
64 active_channels_.insert(channel);
65 }
66 });
67 server_.setUnsubscribeHandler(
68 [this](ChannelId channel) { active_channels_.erase(channel); });
69 aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
70 // In order to run the websocket server, we just let it spin every cycle for
71 // a bit. This isn't great for integration, but lets us stay in control and
72 // until we either have (a) a chance to locate a file descriptor to hand
73 // epoll; or (b) rewrite the foxglove websocket server to use seasocks
74 // (which we know how to integrate), we'll just function with this.
75 // TODO(james): Tighter integration into our event loop structure.
76 server_.run_for(kPollPeriod / 2);
77
78 // Unfortunately, we can't just push out all the messages as they come in.
79 // Foxglove expects that the timestamps associated with each message to be
80 // monotonic, and if you send things out of order then it will clear the
81 // state of the visualization entirely, which makes viewing plots
82 // impossible. If the user only accesses a single channel, that is fine, but
83 // as soon as they try to use multiple channels, you encounter interleaving.
84 // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
85 // out messages older than that time, sorting everything before we send it
86 // out.
87 const aos::monotonic_clock::time_point sort_until =
88 event_loop_->monotonic_now() -
89 std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
90
91 // Pair of <send_time, channel id>.
92 absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
93 fetcher_times;
94
95 // Go through and seed fetcher_times with the first message on each channel.
96 for (const ChannelId channel : active_channels_) {
97 FetcherState *fetcher = &fetchers_[channel];
98 if (fetcher->sent_current_message) {
99 if (fetcher->fetcher->FetchNext()) {
100 fetcher->sent_current_message = false;
101 }
102 }
103 if (!fetcher->sent_current_message) {
104 const aos::monotonic_clock::time_point send_time =
105 fetcher->fetcher->context().monotonic_event_time;
106 if (send_time <= sort_until) {
107 fetcher_times.insert(std::make_pair(send_time, channel));
108 }
109 }
110 }
111
112 // Send the oldest message continually until we run out of messages to send.
113 while (!fetcher_times.empty()) {
114 const ChannelId channel = fetcher_times.begin()->second;
115 FetcherState *fetcher = &fetchers_[channel];
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800116 switch (serialization_) {
117 case Serialization::kJson:
118 server_.sendMessage(
119 channel, fetcher_times.begin()->first.time_since_epoch().count(),
120 aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
121 static_cast<const uint8_t *>(
122 fetcher->fetcher->context().data)));
123 break;
124 case Serialization::kFlatbuffer:
125 server_.sendMessage(
126 channel, fetcher_times.begin()->first.time_since_epoch().count(),
127 {static_cast<const char *>(fetcher->fetcher->context().data),
128 fetcher->fetcher->context().size});
129 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700130 fetcher_times.erase(fetcher_times.begin());
131 fetcher->sent_current_message = true;
132 if (fetcher->fetcher->FetchNext()) {
133 fetcher->sent_current_message = false;
134 const aos::monotonic_clock::time_point send_time =
135 fetcher->fetcher->context().monotonic_event_time;
136 if (send_time <= sort_until) {
137 fetcher_times.insert(std::make_pair(send_time, channel));
138 }
139 }
140 }
141 });
142
143 event_loop_->OnRun([timer, this]() {
144 timer->Setup(event_loop_->monotonic_now(), kPollPeriod);
145 });
146}
147FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
148} // namespace aos