blob: de19443be6f2e2ca05756f651c40ec3f3aa2ce98 [file] [log] [blame]
James Kuszmaul0de4feb2022-04-15 12:16:59 -07001#include "aos/util/foxglove_websocket_lib.h"
2
Stephan Pleinesb1177672024-05-27 17:48:32 -07003#include <chrono>
4#include <compare>
5#include <string>
6#include <utility>
Philipp Schrader790cb542023-07-05 21:06:52 -07007
Stephan Pleinesb1177672024-05-27 17:48:32 -07008#include "absl/container/btree_set.h"
Austin Schuh99f7c6a2024-06-25 22:07:44 -07009#include "absl/flags/flag.h"
10#include "absl/log/check.h"
11#include "absl/log/log.h"
Stephan Pleinesb1177672024-05-27 17:48:32 -070012#include "absl/strings/escaping.h"
13#include "absl/types/span.h"
14#include "flatbuffers/reflection_generated.h"
15#include "flatbuffers/string.h"
16#include "flatbuffers/vector.h"
Stephan Pleinesb1177672024-05-27 17:48:32 -070017#include "nlohmann/json.hpp"
18#include <foxglove/websocket/server.hpp>
19
20#include "aos/configuration.h"
21#include "aos/events/context.h"
James Kuszmaul1e418f62023-02-26 14:40:20 -080022#include "aos/flatbuffer_merge.h"
Stephan Pleinesb1177672024-05-27 17:48:32 -070023#include "aos/flatbuffers.h"
24#include "aos/json_to_flatbuffer.h"
25#include "aos/time/time.h"
James Kuszmaul1e418f62023-02-26 14:40:20 -080026#include "aos/util/mcap_logger.h"
James Kuszmaul0de4feb2022-04-15 12:16:59 -070027
Austin Schuh99f7c6a2024-06-25 22:07:44 -070028ABSL_FLAG(uint32_t, sorting_buffer_ms, 100,
29 "Amount of time to buffer messages to sort them before sending "
30 "them to foxglove.");
James Kuszmaul0de4feb2022-04-15 12:16:59 -070031
32namespace {
33// Period at which to poll the fetchers for all the channels.
34constexpr std::chrono::milliseconds kPollPeriod{50};
35} // namespace
36
37namespace aos {
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080038FoxgloveWebsocketServer::FoxgloveWebsocketServer(
39 aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
James Kuszmaul1e418f62023-02-26 14:40:20 -080040 FetchPinnedChannels fetch_pinned_channels,
41 CanonicalChannelNames canonical_channels)
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080042 : event_loop_(event_loop),
43 serialization_(serialization),
44 fetch_pinned_channels_(fetch_pinned_channels),
James Kuszmaul1e418f62023-02-26 14:40:20 -080045 canonical_channels_(canonical_channels),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080046 server_(port, "aos_foxglove") {
James Kuszmaul0de4feb2022-04-15 12:16:59 -070047 for (const aos::Channel *channel :
48 *event_loop_->configuration()->channels()) {
49 const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
50 if (aos::configuration::ChannelIsReadableOnNode(channel,
51 event_loop_->node()) &&
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080052 (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
53 const FlatbufferDetachedBuffer<reflection::Schema> schema =
54 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul1e418f62023-02-26 14:40:20 -080055 const std::string shortest_name =
56 ShortenedChannelName(event_loop_->configuration(), channel,
57 event_loop_->name(), event_loop_->node());
58 std::string name_to_send;
59 switch (canonical_channels_) {
60 case CanonicalChannelNames::kCanonical:
61 name_to_send = channel->name()->string_view();
62 break;
63 case CanonicalChannelNames::kShortened:
64 name_to_send = shortest_name;
65 break;
66 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -070067 const ChannelId id =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080068 (serialization_ == Serialization::kJson)
69 ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080070 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080071 .encoding = "json",
72 .schemaName = channel->type()->str(),
73 .schema =
74 JsonSchemaForFlatbuffer({channel->schema()}).dump()})
75 : server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080076 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080077 .encoding = "flatbuffer",
78 .schemaName = channel->type()->str(),
79 .schema = absl::Base64Escape(
80 {reinterpret_cast<const char *>(schema.span().data()),
81 schema.span().size()})});
James Kuszmaul0de4feb2022-04-15 12:16:59 -070082 CHECK(fetchers_.count(id) == 0);
83 fetchers_[id] =
84 FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
85 }
86 }
87
88 server_.setSubscribeHandler([this](ChannelId channel) {
89 if (fetchers_.count(channel) == 0) {
90 return;
91 }
92 if (active_channels_.count(channel) == 0) {
93 // Catch up to the latest message on the requested channel, then subscribe
94 // to it.
95 fetchers_[channel].fetcher->Fetch();
96 active_channels_.insert(channel);
97 }
98 });
99 server_.setUnsubscribeHandler(
100 [this](ChannelId channel) { active_channels_.erase(channel); });
101 aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
102 // In order to run the websocket server, we just let it spin every cycle for
103 // a bit. This isn't great for integration, but lets us stay in control and
104 // until we either have (a) a chance to locate a file descriptor to hand
105 // epoll; or (b) rewrite the foxglove websocket server to use seasocks
106 // (which we know how to integrate), we'll just function with this.
107 // TODO(james): Tighter integration into our event loop structure.
108 server_.run_for(kPollPeriod / 2);
109
110 // Unfortunately, we can't just push out all the messages as they come in.
111 // Foxglove expects that the timestamps associated with each message to be
112 // monotonic, and if you send things out of order then it will clear the
113 // state of the visualization entirely, which makes viewing plots
114 // impossible. If the user only accesses a single channel, that is fine, but
115 // as soon as they try to use multiple channels, you encounter interleaving.
116 // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
117 // out messages older than that time, sorting everything before we send it
118 // out.
119 const aos::monotonic_clock::time_point sort_until =
120 event_loop_->monotonic_now() -
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700121 std::chrono::milliseconds(absl::GetFlag(FLAGS_sorting_buffer_ms));
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700122
123 // Pair of <send_time, channel id>.
124 absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
125 fetcher_times;
126
127 // Go through and seed fetcher_times with the first message on each channel.
128 for (const ChannelId channel : active_channels_) {
129 FetcherState *fetcher = &fetchers_[channel];
130 if (fetcher->sent_current_message) {
131 if (fetcher->fetcher->FetchNext()) {
132 fetcher->sent_current_message = false;
133 }
134 }
135 if (!fetcher->sent_current_message) {
136 const aos::monotonic_clock::time_point send_time =
137 fetcher->fetcher->context().monotonic_event_time;
138 if (send_time <= sort_until) {
139 fetcher_times.insert(std::make_pair(send_time, channel));
140 }
141 }
142 }
143
144 // Send the oldest message continually until we run out of messages to send.
145 while (!fetcher_times.empty()) {
146 const ChannelId channel = fetcher_times.begin()->second;
147 FetcherState *fetcher = &fetchers_[channel];
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800148 switch (serialization_) {
149 case Serialization::kJson:
150 server_.sendMessage(
151 channel, fetcher_times.begin()->first.time_since_epoch().count(),
152 aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
153 static_cast<const uint8_t *>(
154 fetcher->fetcher->context().data)));
155 break;
156 case Serialization::kFlatbuffer:
157 server_.sendMessage(
158 channel, fetcher_times.begin()->first.time_since_epoch().count(),
159 {static_cast<const char *>(fetcher->fetcher->context().data),
160 fetcher->fetcher->context().size});
161 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700162 fetcher_times.erase(fetcher_times.begin());
163 fetcher->sent_current_message = true;
164 if (fetcher->fetcher->FetchNext()) {
165 fetcher->sent_current_message = false;
166 const aos::monotonic_clock::time_point send_time =
167 fetcher->fetcher->context().monotonic_event_time;
168 if (send_time <= sort_until) {
169 fetcher_times.insert(std::make_pair(send_time, channel));
170 }
171 }
172 }
173 });
174
175 event_loop_->OnRun([timer, this]() {
Philipp Schradera6712522023-07-05 20:25:11 -0700176 timer->Schedule(event_loop_->monotonic_now(), kPollPeriod);
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700177 });
178}
179FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
180} // namespace aos