blob: fd2cfd4d00bc37a48438308f33939252f00f8b68 [file] [log] [blame]
James Kuszmaul0de4feb2022-04-15 12:16:59 -07001#include "aos/util/foxglove_websocket_lib.h"
2
James Kuszmaulf1dbaff2023-02-08 21:17:32 -08003#include "absl/strings/escaping.h"
Philipp Schrader790cb542023-07-05 21:06:52 -07004#include "gflags/gflags.h"
5
James Kuszmaul1e418f62023-02-26 14:40:20 -08006#include "aos/flatbuffer_merge.h"
7#include "aos/util/mcap_logger.h"
James Kuszmaul0de4feb2022-04-15 12:16:59 -07008
9DEFINE_uint32(sorting_buffer_ms, 100,
10 "Amount of time to buffer messages to sort them before sending "
11 "them to foxglove.");
James Kuszmaul0de4feb2022-04-15 12:16:59 -070012
13namespace {
14// Period at which to poll the fetchers for all the channels.
15constexpr std::chrono::milliseconds kPollPeriod{50};
16} // namespace
17
18namespace aos {
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080019FoxgloveWebsocketServer::FoxgloveWebsocketServer(
20 aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
James Kuszmaul1e418f62023-02-26 14:40:20 -080021 FetchPinnedChannels fetch_pinned_channels,
22 CanonicalChannelNames canonical_channels)
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080023 : event_loop_(event_loop),
24 serialization_(serialization),
25 fetch_pinned_channels_(fetch_pinned_channels),
James Kuszmaul1e418f62023-02-26 14:40:20 -080026 canonical_channels_(canonical_channels),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080027 server_(port, "aos_foxglove") {
James Kuszmaul0de4feb2022-04-15 12:16:59 -070028 for (const aos::Channel *channel :
29 *event_loop_->configuration()->channels()) {
30 const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
31 if (aos::configuration::ChannelIsReadableOnNode(channel,
32 event_loop_->node()) &&
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080033 (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
34 const FlatbufferDetachedBuffer<reflection::Schema> schema =
35 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul1e418f62023-02-26 14:40:20 -080036 const std::string shortest_name =
37 ShortenedChannelName(event_loop_->configuration(), channel,
38 event_loop_->name(), event_loop_->node());
39 std::string name_to_send;
40 switch (canonical_channels_) {
41 case CanonicalChannelNames::kCanonical:
42 name_to_send = channel->name()->string_view();
43 break;
44 case CanonicalChannelNames::kShortened:
45 name_to_send = shortest_name;
46 break;
47 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -070048 const ChannelId id =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080049 (serialization_ == Serialization::kJson)
50 ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080051 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080052 .encoding = "json",
53 .schemaName = channel->type()->str(),
54 .schema =
55 JsonSchemaForFlatbuffer({channel->schema()}).dump()})
56 : server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080057 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080058 .encoding = "flatbuffer",
59 .schemaName = channel->type()->str(),
60 .schema = absl::Base64Escape(
61 {reinterpret_cast<const char *>(schema.span().data()),
62 schema.span().size()})});
James Kuszmaul0de4feb2022-04-15 12:16:59 -070063 CHECK(fetchers_.count(id) == 0);
64 fetchers_[id] =
65 FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
66 }
67 }
68
69 server_.setSubscribeHandler([this](ChannelId channel) {
70 if (fetchers_.count(channel) == 0) {
71 return;
72 }
73 if (active_channels_.count(channel) == 0) {
74 // Catch up to the latest message on the requested channel, then subscribe
75 // to it.
76 fetchers_[channel].fetcher->Fetch();
77 active_channels_.insert(channel);
78 }
79 });
80 server_.setUnsubscribeHandler(
81 [this](ChannelId channel) { active_channels_.erase(channel); });
82 aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
83 // In order to run the websocket server, we just let it spin every cycle for
84 // a bit. This isn't great for integration, but lets us stay in control and
85 // until we either have (a) a chance to locate a file descriptor to hand
86 // epoll; or (b) rewrite the foxglove websocket server to use seasocks
87 // (which we know how to integrate), we'll just function with this.
88 // TODO(james): Tighter integration into our event loop structure.
89 server_.run_for(kPollPeriod / 2);
90
91 // Unfortunately, we can't just push out all the messages as they come in.
92 // Foxglove expects that the timestamps associated with each message to be
93 // monotonic, and if you send things out of order then it will clear the
94 // state of the visualization entirely, which makes viewing plots
95 // impossible. If the user only accesses a single channel, that is fine, but
96 // as soon as they try to use multiple channels, you encounter interleaving.
97 // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
98 // out messages older than that time, sorting everything before we send it
99 // out.
100 const aos::monotonic_clock::time_point sort_until =
101 event_loop_->monotonic_now() -
102 std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
103
104 // Pair of <send_time, channel id>.
105 absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
106 fetcher_times;
107
108 // Go through and seed fetcher_times with the first message on each channel.
109 for (const ChannelId channel : active_channels_) {
110 FetcherState *fetcher = &fetchers_[channel];
111 if (fetcher->sent_current_message) {
112 if (fetcher->fetcher->FetchNext()) {
113 fetcher->sent_current_message = false;
114 }
115 }
116 if (!fetcher->sent_current_message) {
117 const aos::monotonic_clock::time_point send_time =
118 fetcher->fetcher->context().monotonic_event_time;
119 if (send_time <= sort_until) {
120 fetcher_times.insert(std::make_pair(send_time, channel));
121 }
122 }
123 }
124
125 // Send the oldest message continually until we run out of messages to send.
126 while (!fetcher_times.empty()) {
127 const ChannelId channel = fetcher_times.begin()->second;
128 FetcherState *fetcher = &fetchers_[channel];
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800129 switch (serialization_) {
130 case Serialization::kJson:
131 server_.sendMessage(
132 channel, fetcher_times.begin()->first.time_since_epoch().count(),
133 aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
134 static_cast<const uint8_t *>(
135 fetcher->fetcher->context().data)));
136 break;
137 case Serialization::kFlatbuffer:
138 server_.sendMessage(
139 channel, fetcher_times.begin()->first.time_since_epoch().count(),
140 {static_cast<const char *>(fetcher->fetcher->context().data),
141 fetcher->fetcher->context().size});
142 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700143 fetcher_times.erase(fetcher_times.begin());
144 fetcher->sent_current_message = true;
145 if (fetcher->fetcher->FetchNext()) {
146 fetcher->sent_current_message = false;
147 const aos::monotonic_clock::time_point send_time =
148 fetcher->fetcher->context().monotonic_event_time;
149 if (send_time <= sort_until) {
150 fetcher_times.insert(std::make_pair(send_time, channel));
151 }
152 }
153 }
154 });
155
156 event_loop_->OnRun([timer, this]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700157 timer->Schedule(event_loop_->monotonic_now(), kPollPeriod);
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700158 });
159}
160FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
161} // namespace aos