blob: 06c551eb8bd7b16ba3e1c8e31cadb7c51577fef3 [file] [log] [blame]
James Kuszmaul0de4feb2022-04-15 12:16:59 -07001#include "aos/util/foxglove_websocket_lib.h"
2
James Kuszmaulf1dbaff2023-02-08 21:17:32 -08003#include "absl/strings/escaping.h"
James Kuszmaul1e418f62023-02-26 14:40:20 -08004#include "aos/flatbuffer_merge.h"
5#include "aos/util/mcap_logger.h"
James Kuszmaul0de4feb2022-04-15 12:16:59 -07006#include "gflags/gflags.h"
7
8DEFINE_uint32(sorting_buffer_ms, 100,
9 "Amount of time to buffer messages to sort them before sending "
10 "them to foxglove.");
James Kuszmaul0de4feb2022-04-15 12:16:59 -070011
12namespace {
13// Period at which to poll the fetchers for all the channels.
14constexpr std::chrono::milliseconds kPollPeriod{50};
15} // namespace
16
17namespace aos {
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080018FoxgloveWebsocketServer::FoxgloveWebsocketServer(
19 aos::EventLoop *event_loop, uint32_t port, Serialization serialization,
James Kuszmaul1e418f62023-02-26 14:40:20 -080020 FetchPinnedChannels fetch_pinned_channels,
21 CanonicalChannelNames canonical_channels)
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080022 : event_loop_(event_loop),
23 serialization_(serialization),
24 fetch_pinned_channels_(fetch_pinned_channels),
James Kuszmaul1e418f62023-02-26 14:40:20 -080025 canonical_channels_(canonical_channels),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080026 server_(port, "aos_foxglove") {
James Kuszmaul0de4feb2022-04-15 12:16:59 -070027 for (const aos::Channel *channel :
28 *event_loop_->configuration()->channels()) {
29 const bool is_pinned = (channel->read_method() == ReadMethod::PIN);
30 if (aos::configuration::ChannelIsReadableOnNode(channel,
31 event_loop_->node()) &&
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080032 (!is_pinned || fetch_pinned_channels_ == FetchPinnedChannels::kYes)) {
33 const FlatbufferDetachedBuffer<reflection::Schema> schema =
34 RecursiveCopyFlatBuffer(channel->schema());
James Kuszmaul1e418f62023-02-26 14:40:20 -080035 const std::string shortest_name =
36 ShortenedChannelName(event_loop_->configuration(), channel,
37 event_loop_->name(), event_loop_->node());
38 std::string name_to_send;
39 switch (canonical_channels_) {
40 case CanonicalChannelNames::kCanonical:
41 name_to_send = channel->name()->string_view();
42 break;
43 case CanonicalChannelNames::kShortened:
44 name_to_send = shortest_name;
45 break;
46 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -070047 const ChannelId id =
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080048 (serialization_ == Serialization::kJson)
49 ? server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080050 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080051 .encoding = "json",
52 .schemaName = channel->type()->str(),
53 .schema =
54 JsonSchemaForFlatbuffer({channel->schema()}).dump()})
55 : server_.addChannel(foxglove::websocket::ChannelWithoutId{
James Kuszmaul1e418f62023-02-26 14:40:20 -080056 .topic = name_to_send + " " + channel->type()->str(),
James Kuszmaulf1dbaff2023-02-08 21:17:32 -080057 .encoding = "flatbuffer",
58 .schemaName = channel->type()->str(),
59 .schema = absl::Base64Escape(
60 {reinterpret_cast<const char *>(schema.span().data()),
61 schema.span().size()})});
James Kuszmaul0de4feb2022-04-15 12:16:59 -070062 CHECK(fetchers_.count(id) == 0);
63 fetchers_[id] =
64 FetcherState{.fetcher = event_loop_->MakeRawFetcher(channel)};
65 }
66 }
67
68 server_.setSubscribeHandler([this](ChannelId channel) {
69 if (fetchers_.count(channel) == 0) {
70 return;
71 }
72 if (active_channels_.count(channel) == 0) {
73 // Catch up to the latest message on the requested channel, then subscribe
74 // to it.
75 fetchers_[channel].fetcher->Fetch();
76 active_channels_.insert(channel);
77 }
78 });
79 server_.setUnsubscribeHandler(
80 [this](ChannelId channel) { active_channels_.erase(channel); });
81 aos::TimerHandler *timer = event_loop_->AddTimer([this]() {
82 // In order to run the websocket server, we just let it spin every cycle for
83 // a bit. This isn't great for integration, but lets us stay in control and
84 // until we either have (a) a chance to locate a file descriptor to hand
85 // epoll; or (b) rewrite the foxglove websocket server to use seasocks
86 // (which we know how to integrate), we'll just function with this.
87 // TODO(james): Tighter integration into our event loop structure.
88 server_.run_for(kPollPeriod / 2);
89
90 // Unfortunately, we can't just push out all the messages as they come in.
91 // Foxglove expects that the timestamps associated with each message to be
92 // monotonic, and if you send things out of order then it will clear the
93 // state of the visualization entirely, which makes viewing plots
94 // impossible. If the user only accesses a single channel, that is fine, but
95 // as soon as they try to use multiple channels, you encounter interleaving.
96 // To resolve this, we specify a buffer (--sorting_buffer_ms), and only send
97 // out messages older than that time, sorting everything before we send it
98 // out.
99 const aos::monotonic_clock::time_point sort_until =
100 event_loop_->monotonic_now() -
101 std::chrono::milliseconds(FLAGS_sorting_buffer_ms);
102
103 // Pair of <send_time, channel id>.
104 absl::btree_set<std::pair<aos::monotonic_clock::time_point, ChannelId>>
105 fetcher_times;
106
107 // Go through and seed fetcher_times with the first message on each channel.
108 for (const ChannelId channel : active_channels_) {
109 FetcherState *fetcher = &fetchers_[channel];
110 if (fetcher->sent_current_message) {
111 if (fetcher->fetcher->FetchNext()) {
112 fetcher->sent_current_message = false;
113 }
114 }
115 if (!fetcher->sent_current_message) {
116 const aos::monotonic_clock::time_point send_time =
117 fetcher->fetcher->context().monotonic_event_time;
118 if (send_time <= sort_until) {
119 fetcher_times.insert(std::make_pair(send_time, channel));
120 }
121 }
122 }
123
124 // Send the oldest message continually until we run out of messages to send.
125 while (!fetcher_times.empty()) {
126 const ChannelId channel = fetcher_times.begin()->second;
127 FetcherState *fetcher = &fetchers_[channel];
James Kuszmaulf1dbaff2023-02-08 21:17:32 -0800128 switch (serialization_) {
129 case Serialization::kJson:
130 server_.sendMessage(
131 channel, fetcher_times.begin()->first.time_since_epoch().count(),
132 aos::FlatbufferToJson(fetcher->fetcher->channel()->schema(),
133 static_cast<const uint8_t *>(
134 fetcher->fetcher->context().data)));
135 break;
136 case Serialization::kFlatbuffer:
137 server_.sendMessage(
138 channel, fetcher_times.begin()->first.time_since_epoch().count(),
139 {static_cast<const char *>(fetcher->fetcher->context().data),
140 fetcher->fetcher->context().size});
141 }
James Kuszmaul0de4feb2022-04-15 12:16:59 -0700142 fetcher_times.erase(fetcher_times.begin());
143 fetcher->sent_current_message = true;
144 if (fetcher->fetcher->FetchNext()) {
145 fetcher->sent_current_message = false;
146 const aos::monotonic_clock::time_point send_time =
147 fetcher->fetcher->context().monotonic_event_time;
148 if (send_time <= sort_until) {
149 fetcher_times.insert(std::make_pair(send_time, channel));
150 }
151 }
152 }
153 });
154
155 event_loop_->OnRun([timer, this]() {
156 timer->Setup(event_loop_->monotonic_now(), kPollPeriod);
157 });
158}
159FoxgloveWebsocketServer::~FoxgloveWebsocketServer() { server_.stop(); }
160} // namespace aos