blob: 7a825e47b67953bb3e6568d015a2934e9a5cde1a [file] [log] [blame]
James Kuszmaul0de4feb2022-04-15 12:16:59 -07001#include "aos/util/foxglove_websocket_lib.h"
2
Stephan Pleinesb1177672024-05-27 17:48:32 -07003#include <chrono>
4#include <compare>
5#include <string>
6#include <utility>
Philipp Schrader790cb542023-07-05 21:06:52 -07007
Stephan Pleinesb1177672024-05-27 17:48:32 -07008#include "absl/container/btree_set.h"
9#include "absl/strings/escaping.h"
10#include "absl/types/span.h"
11#include "flatbuffers/reflection_generated.h"
12#include "flatbuffers/string.h"
13#include "flatbuffers/vector.h"
14#include "gflags/gflags.h"
15#include "glog/logging.h"
16#include "nlohmann/json.hpp"
17#include <foxglove/websocket/server.hpp>
18
19#include "aos/configuration.h"
20#include "aos/events/context.h"
James Kuszmaul1e418f62023-02-26 14:40:20 -080021#include "aos/flatbuffer_merge.h"
Stephan Pleinesb1177672024-05-27 17:48:32 -070022#include "aos/flatbuffers.h"
23#include "aos/json_to_flatbuffer.h"
24#include "aos/time/time.h"
James Kuszmaul1e418f62023-02-26 14:40:20 -080025#include "aos/util/mcap_logger.h"
James Kuszmaul0de4feb2022-04-15 12:16:59 -070026
27DEFINE_uint32(sorting_buffer_ms, 100,
28 "Amount of time to buffer messages to sort them before sending "
29 "them to foxglove.");
James Kuszmaul0de4feb2022-04-15 12:16:59 -070030
31namespace {
32// Period at which to poll the fetchers for all the channels.
33constexpr std::chrono::milliseconds kPollPeriod{50};
34} // namespace
35
36namespace aos {
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080037FoxgloveWebsocketServer::FoxgloveWebsocketServer(
38 aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
James Kuszmaul1e418f62023-02-26 14:40:20 -080039 FetchPinnedChannels fetch_pinned_channels,
40 CanonicalChannelNames canonical_channels)
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080041 : event_loop_(event_loop),
42 serialization_(serialization),
43 fetch_pinned_channels_(fetch_pinned_channels),
James Kuszmaul1e418f62023-02-26 14:40:20 -080044 canonical_channels_(canonical_channels),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080045 server_(port, "aos_foxglove") {
James Kuszmaul0de4feb2022-04-15 12:16:59 -070046 for (const aos::Channel *channel :
47 *event_loop_->configuration()->channels()) {
48 const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
49 if (aos::configuration::ChannelIsReadableOnNode(channel,
50 event_loop_->node()) &&
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080051 (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
52 const FlatbufferDetachedBuffer<reflection::Schema> schema =
53 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul1e418f62023-02-26 14:40:20 -080054 const std::string shortest_name =
55 ShortenedChannelName(event_loop_->configuration(), channel,
56 event_loop_->name(), event_loop_->node());
57 std::string name_to_send;
58 switch (canonical_channels_) {
59 case CanonicalChannelNames::kCanonical:
60 name_to_send = channel->name()->string_view();
61 break;
62 case CanonicalChannelNames::kShortened:
63 name_to_send = shortest_name;
64 break;
65 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -070066 const ChannelId id =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080067 (serialization_ == Serialization::kJson)
68 ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080069 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080070 .encoding = "json",
71 .schemaName = channel->type()->str(),
72 .schema =
73 JsonSchemaForFlatbuffer({channel->schema()}).dump()})
74 : server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080075 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080076 .encoding = "flatbuffer",
77 .schemaName = channel->type()->str(),
78 .schema = absl::Base64Escape(
79 {reinterpret_cast<const char *>(schema.span().data()),
80 schema.span().size()})});
James Kuszmaul0de4feb2022-04-15 12:16:59 -070081 CHECK(fetchers_.count(id) == 0);
82 fetchers_[id] =
83 FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
84 }
85 }
86
87 server_.setSubscribeHandler([this](ChannelId channel) {
88 if (fetchers_.count(channel) == 0) {
89 return;
90 }
91 if (active_channels_.count(channel) == 0) {
92 // Catch up to the latest message on the requested channel, then subscribe
93 // to it.
94 fetchers_[channel].fetcher->Fetch();
95 active_channels_.insert(channel);
96 }
97 });
98 server_.setUnsubscribeHandler(
99 [this](ChannelId channel) { active_channels_.erase(channel); });
100 aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
101 // In order to run the websocket server, we just let it spin every cycle for
102 // a bit. This isn't great for integration, but lets us stay in control and
103 // until we either have (a) a chance to locate a file descriptor to hand
104 // epoll; or (b) rewrite the foxglove websocket server to use seasocks
105 // (which we know how to integrate), we'll just function with this.
106 // TODO(james): Tighter integration into our event loop structure.
107 server_.run_for(kPollPeriod / 2);
108
109 // Unfortunately, we can't just push out all the messages as they come in.
110 // Foxglove expects that the timestamps associated with each message to be
111 // monotonic, and if you send things out of order then it will clear the
112 // state of the visualization entirely, which makes viewing plots
113 // impossible. If the user only accesses a single channel, that is fine, but
114 // as soon as they try to use multiple channels, you encounter interleaving.
115 // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
116 // out messages older than that time, sorting everything before we send it
117 // out.
118 const aos::monotonic_clock::time_point sort_until =
119 event_loop_->monotonic_now() -
120 std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
121
122 // Pair of <send_time, channel id>.
123 absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
124 fetcher_times;
125
126 // Go through and seed fetcher_times with the first message on each channel.
127 for (const ChannelId channel : active_channels_) {
128 FetcherState *fetcher = &fetchers_[channel];
129 if (fetcher->sent_current_message) {
130 if (fetcher->fetcher->FetchNext()) {
131 fetcher->sent_current_message = false;
132 }
133 }
134 if (!fetcher->sent_current_message) {
135 const aos::monotonic_clock::time_point send_time =
136 fetcher->fetcher->context().monotonic_event_time;
137 if (send_time <= sort_until) {
138 fetcher_times.insert(std::make_pair(send_time, channel));
139 }
140 }
141 }
142
143 // Send the oldest message continually until we run out of messages to send.
144 while (!fetcher_times.empty()) {
145 const ChannelId channel = fetcher_times.begin()->second;
146 FetcherState *fetcher = &fetchers_[channel];
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800147 switch (serialization_) {
148 case Serialization::kJson:
149 server_.sendMessage(
150 channel, fetcher_times.begin()->first.time_since_epoch().count(),
151 aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
152 static_cast<const uint8_t *>(
153 fetcher->fetcher->context().data)));
154 break;
155 case Serialization::kFlatbuffer:
156 server_.sendMessage(
157 channel, fetcher_times.begin()->first.time_since_epoch().count(),
158 {static_cast<const char *>(fetcher->fetcher->context().data),
159 fetcher->fetcher->context().size});
160 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700161 fetcher_times.erase(fetcher_times.begin());
162 fetcher->sent_current_message = true;
163 if (fetcher->fetcher->FetchNext()) {
164 fetcher->sent_current_message = false;
165 const aos::monotonic_clock::time_point send_time =
166 fetcher->fetcher->context().monotonic_event_time;
167 if (send_time <= sort_until) {
168 fetcher_times.insert(std::make_pair(send_time, channel));
169 }
170 }
171 }
172 });
173
174 event_loop_->OnRun([timer, this]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700175 timer->Schedule(event_loop_->monotonic_now(), kPollPeriod);
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700176 });
177}
178FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
179} // namespace aos