blob: 8e0c4d064a37feb1f3896142b5123ac9dc231dee [file] [log] [blame]
Tyler Chatowb3850c12020-02-26 20:55:48 -08001#define GST_USE_UNSTABLE_API
2#define GST_DISABLE_REGISTRY 1
3
4#include <glib-unix.h>
5#include <glib.h>
6#include <gst/app/app.h>
7#include <gst/gst.h>
8#include <gst/sdp/sdp.h>
9#include <gst/webrtc/icetransport.h>
10#include <gst/webrtc/webrtc.h>
11#include <sys/stat.h>
12#include <sys/types.h>
13
14#include <map>
15#include <thread>
16
Austin Schuh99f7c6a2024-06-25 22:07:44 -070017#include "absl/flags/flag.h"
18#include "absl/log/check.h"
19#include "absl/log/log.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080020#include "absl/strings/str_format.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070021#include "flatbuffers/flatbuffers.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070022
Tyler Chatowb3850c12020-02-26 20:55:48 -080023#include "aos/events/glib_main_loop.h"
24#include "aos/events/shm_event_loop.h"
25#include "aos/init.h"
26#include "aos/network/web_proxy_generated.h"
27#include "aos/seasocks/seasocks_logger.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080028#include "frc971/vision/vision_generated.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080029#include "internal/Embedded.h"
30#include "seasocks/Server.h"
31#include "seasocks/StringUtil.h"
32#include "seasocks/WebSocket.h"
33
Austin Schuh99f7c6a2024-06-25 22:07:44 -070034ABSL_FLAG(std::string, config, "aos_config.json",
35 "Name of the config file to replay using.");
36ABSL_FLAG(std::string, device, "/dev/video0",
37 "Camera fd. Ignored if reading from channel");
38ABSL_FLAG(std::string, data_dir, "image_streamer_www",
39 "Directory to serve data files from");
40ABSL_FLAG(bool, publish_images, true,
41 "If true, publish images read from v4l2 to /camera.");
42ABSL_FLAG(int32_t, width, 400, "Image width");
43ABSL_FLAG(int32_t, height, 300, "Image height");
44ABSL_FLAG(int32_t, framerate, 25, "Framerate (FPS)");
45ABSL_FLAG(int32_t, brightness, 50, "Camera brightness");
46ABSL_FLAG(int32_t, exposure, 300, "Manual exposure");
47ABSL_FLAG(int32_t, bitrate, 500000, "H264 encode bitrate");
48ABSL_FLAG(int32_t, streaming_port, 1180,
49 "Port to stream images on with seasocks");
50ABSL_FLAG(int32_t, min_port, 5800, "Min rtp port");
51ABSL_FLAG(int32_t, max_port, 5810, "Max rtp port");
52ABSL_FLAG(std::string, listen_on, "",
53 "Channel on which to receive frames from. Used in place of "
54 "internal V4L2 reader. Note: width and height MUST match the "
55 "expected size of channel images.");
Tyler Chatowb3850c12020-02-26 20:55:48 -080056
57class Connection;
58
59using aos::web_proxy::Payload;
60using aos::web_proxy::SdpType;
61using aos::web_proxy::WebSocketIce;
62using aos::web_proxy::WebSocketMessage;
63using aos::web_proxy::WebSocketSdp;
64
Tyler Chatow39b6a322022-04-15 00:03:58 -070065class GstSampleSource {
66 public:
67 GstSampleSource() = default;
68
69 virtual ~GstSampleSource() = default;
70
71 private:
72 GstSampleSource(const GstSampleSource &) = delete;
73};
74
75class V4L2Source : public GstSampleSource {
76 public:
77 V4L2Source(std::function<void(GstSample *)> callback)
78 : callback_(std::move(callback)) {
79 GError *error = NULL;
80
81 // Create pipeline to read from camera, pack into rtp stream, and dump
82 // stream to callback. v4l2 device should already be configured with correct
83 // bitrate from v4l2-ctl. do-timestamp marks the time the frame was taken to
84 // track when it should be dropped under latency.
85
86 // With the Pi's hardware encoder, we can encode and package the stream once
87 // and the clients will jump in at any point unsynchronized. With the stream
88 // from x264enc this doesn't seem to work. For now, just reencode for each
89 // client since we don't expect more than 1 or 2.
90
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070091 std::string exposure;
Austin Schuh99f7c6a2024-06-25 22:07:44 -070092 if (absl::GetFlag(FLAGS_exposure) > 0) {
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070093 exposure = absl::StrFormat(",auto_exposure=1,exposure_time_absolute=%d",
Austin Schuh99f7c6a2024-06-25 22:07:44 -070094 absl::GetFlag(FLAGS_exposure));
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070095 }
96
Tyler Chatow39b6a322022-04-15 00:03:58 -070097 pipeline_ = gst_parse_launch(
98 absl::StrFormat("v4l2src device=%s do-timestamp=true "
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070099 "extra-controls=\"c,brightness=%d%s\" ! "
Tyler Chatow39b6a322022-04-15 00:03:58 -0700100 "video/x-raw,width=%d,height=%d,framerate=%d/"
101 "1,format=YUY2 ! appsink "
102 "name=appsink "
103 "emit-signals=true sync=false async=false "
104 "caps=video/x-raw,format=YUY2",
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700105 absl::GetFlag(FLAGS_device),
106 absl::GetFlag(FLAGS_brightness), exposure,
107 absl::GetFlag(FLAGS_width), absl::GetFlag(FLAGS_height),
108 absl::GetFlag(FLAGS_framerate))
Tyler Chatow39b6a322022-04-15 00:03:58 -0700109 .c_str(),
110 &error);
111
112 if (error != NULL) {
113 LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
114 }
115
116 appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
117 if (appsink_ == NULL) {
118 LOG(FATAL) << "Could not get appsink";
119 }
120
121 g_signal_connect(appsink_, "new-sample",
122 G_CALLBACK(V4L2Source::OnSampleCallback),
123 static_cast<gpointer>(this));
124
125 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
126 }
127
128 ~V4L2Source() {
129 if (pipeline_ != NULL) {
130 gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
131 gst_object_unref(GST_OBJECT(pipeline_));
132 gst_object_unref(GST_OBJECT(appsink_));
133 }
134 }
135
136 private:
137 static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
138 static_cast<V4L2Source *>(user_data)->OnSample();
139 return GST_FLOW_OK;
140 }
141
142 void OnSample() {
143 GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
144 if (sample == NULL) {
145 LOG(WARNING) << "Received null sample";
146 return;
147 }
148 callback_(sample);
149 gst_sample_unref(sample);
150 }
151
152 GstElement *pipeline_;
153 GstElement *appsink_;
154
155 std::function<void(GstSample *)> callback_;
156};
157
158class ChannelSource : public GstSampleSource {
159 public:
160 ChannelSource(aos::ShmEventLoop *event_loop,
161 std::function<void(GstSample *)> callback)
162 : callback_(std::move(callback)) {
163 event_loop->MakeWatcher(
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700164 absl::GetFlag(FLAGS_listen_on),
Tyler Chatow39b6a322022-04-15 00:03:58 -0700165 [this](const frc971::vision::CameraImage &image) { OnImage(image); });
166 }
167
168 private:
169 void OnImage(const frc971::vision::CameraImage &image) {
170 if (!image.has_rows() || !image.has_cols() || !image.has_data()) {
171 VLOG(2) << "Skipping CameraImage with no data";
172 return;
173 }
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700174 CHECK_EQ(image.rows(), absl::GetFlag(FLAGS_height));
175 CHECK_EQ(image.cols(), absl::GetFlag(FLAGS_width));
Tyler Chatow39b6a322022-04-15 00:03:58 -0700176
177 GBytes *bytes = g_bytes_new(image.data()->data(), image.data()->size());
178 GstBuffer *buffer = gst_buffer_new_wrapped_bytes(bytes);
179
180 GST_BUFFER_PTS(buffer) = image.monotonic_timestamp_ns();
181
Austin Schuh6bdcc372024-06-27 14:49:11 -0700182 GstCaps *caps = gst_caps_new_simple(
Tyler Chatow39b6a322022-04-15 00:03:58 -0700183 "video/x-raw", "width", G_TYPE_INT, image.cols(), "height", G_TYPE_INT,
Austin Schuh6bdcc372024-06-27 14:49:11 -0700184 image.rows(), "format", G_TYPE_STRING, "YUY2", nullptr);
185 CHECK(caps != nullptr);
Tyler Chatow39b6a322022-04-15 00:03:58 -0700186
187 GstSample *sample = gst_sample_new(buffer, caps, nullptr, nullptr);
188
189 callback_(sample);
190
191 gst_sample_unref(sample);
192 gst_caps_unref(caps);
193 gst_buffer_unref(buffer);
194 g_bytes_unref(bytes);
195 }
196
197 std::function<void(GstSample *)> callback_;
198};
199
Tyler Chatowb3850c12020-02-26 20:55:48 -0800200// Basic class that handles receiving new websocket connections. Creates a new
201// Connection to manage the rest of the negotiation and data passing. When the
202// websocket closes, it deletes the Connection.
203class WebsocketHandler : public ::seasocks::WebSocket::Handler {
204 public:
205 WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server);
Tyler Chatow39b6a322022-04-15 00:03:58 -0700206 ~WebsocketHandler() override = default;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800207
208 void onConnect(::seasocks::WebSocket *sock) override;
209 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
210 size_t size) override;
211 void onDisconnect(::seasocks::WebSocket *sock) override;
212
213 private:
Tyler Chatow39b6a322022-04-15 00:03:58 -0700214 void OnSample(GstSample *sample);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800215
James Kuszmaul345b0812024-04-18 13:07:45 -0500216 aos::ShmEventLoop *event_loop_;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800217 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
218 ::seasocks::Server *server_;
Tyler Chatow39b6a322022-04-15 00:03:58 -0700219 std::unique_ptr<GstSampleSource> source_;
James Kuszmaul345b0812024-04-18 13:07:45 -0500220 aos::TimerHandler *manual_restart_handle_;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800221
222 aos::Sender<frc971::vision::CameraImage> sender_;
223};
224
225// Seasocks requires that sends happen on the correct thread. This class takes a
226// detached buffer to send on a specific websocket connection and sends it when
227// seasocks is ready.
228class UpdateData : public ::seasocks::Server::Runnable {
229 public:
230 UpdateData(::seasocks::WebSocket *websocket,
231 flatbuffers::DetachedBuffer &&buffer)
232 : sock_(websocket), buffer_(std::move(buffer)) {}
233 ~UpdateData() override = default;
234 UpdateData(const UpdateData &) = delete;
235 UpdateData &operator=(const UpdateData &) = delete;
236
237 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
238
239 private:
240 ::seasocks::WebSocket *sock_;
241 const flatbuffers::DetachedBuffer buffer_;
242};
243
244class Connection {
245 public:
246 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
247
248 ~Connection();
249
250 void HandleWebSocketData(const uint8_t *data, size_t size);
251
252 void OnSample(GstSample *sample);
253
254 private:
255 static void OnOfferCreatedCallback(GstPromise *promise, gpointer user_data) {
256 static_cast<Connection *>(user_data)->OnOfferCreated(promise);
257 }
258
259 static void OnNegotiationNeededCallback(GstElement *, gpointer user_data) {
260 static_cast<Connection *>(user_data)->OnNegotiationNeeded();
261 }
262
263 static void OnIceCandidateCallback(GstElement *, guint mline_index,
264 gchar *candidate, gpointer user_data) {
265 static_cast<Connection *>(user_data)->OnIceCandidate(mline_index,
266 candidate);
267 }
268
269 void OnOfferCreated(GstPromise *promise);
270 void OnNegotiationNeeded();
271 void OnIceCandidate(guint mline_index, gchar *candidate);
272
273 ::seasocks::WebSocket *sock_;
274 ::seasocks::Server *server_;
275
276 GstElement *pipeline_;
277 GstElement *webrtcbin_;
278 GstElement *appsrc_;
279
280 bool first_sample_ = true;
281};
282
283WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop,
284 ::seasocks::Server *server)
James Kuszmaul345b0812024-04-18 13:07:45 -0500285 : event_loop_(event_loop),
286 server_(server),
287 manual_restart_handle_(
288 event_loop_->AddTimer([this]() { event_loop_->Exit(); })) {
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700289 if (absl::GetFlag(FLAGS_listen_on).empty()) {
290 if (absl::GetFlag(FLAGS_publish_images)) {
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700291 sender_ = event_loop->MakeSender<frc971::vision::CameraImage>("/camera");
292 }
Tyler Chatow39b6a322022-04-15 00:03:58 -0700293 source_ =
294 std::make_unique<V4L2Source>([this](auto sample) { OnSample(sample); });
295 } else {
296 source_ = std::make_unique<ChannelSource>(
297 event_loop, [this](auto sample) { OnSample(sample); });
Tyler Chatowb3850c12020-02-26 20:55:48 -0800298 }
James Kuszmaul345b0812024-04-18 13:07:45 -0500299 event_loop_->OnRun([this]() {
300 manual_restart_handle_->Schedule(event_loop_->monotonic_now() +
301 std::chrono::seconds(10));
302 });
Tyler Chatowb3850c12020-02-26 20:55:48 -0800303}
304
305void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
306 std::unique_ptr<Connection> conn =
307 std::make_unique<Connection>(sock, server_);
308 connections_.insert({sock, std::move(conn)});
309}
310
311void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
312 size_t size) {
313 connections_[sock]->HandleWebSocketData(data, size);
314}
315
Tyler Chatow39b6a322022-04-15 00:03:58 -0700316void WebsocketHandler::OnSample(GstSample *sample) {
Tyler Chatowb3850c12020-02-26 20:55:48 -0800317 for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) {
318 iter->second->OnSample(sample);
319 }
320
Tyler Chatow39b6a322022-04-15 00:03:58 -0700321 if (sender_.valid()) {
Austin Schuh6bdcc372024-06-27 14:49:11 -0700322 const GstCaps *caps = gst_sample_get_caps(sample);
323 CHECK(caps != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800324 CHECK_GT(gst_caps_get_size(caps), 0U);
325 const GstStructure *str = gst_caps_get_structure(caps, 0);
326
327 gint width;
328 gint height;
329
330 CHECK(gst_structure_get_int(str, "width", &width));
331 CHECK(gst_structure_get_int(str, "height", &height));
332
Austin Schuh6bdcc372024-06-27 14:49:11 -0700333 GstBuffer *buffer = gst_sample_get_buffer(sample);
334 CHECK(buffer != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800335
336 const gsize size = gst_buffer_get_size(buffer);
337
338 auto builder = sender_.MakeBuilder();
339
340 uint8_t *image_data;
341 auto image_offset =
342 builder.fbb()->CreateUninitializedVector(size, &image_data);
343 gst_buffer_extract(buffer, 0, image_data, size);
344
345 auto image_builder = builder.MakeBuilder<frc971::vision::CameraImage>();
346 image_builder.add_rows(height);
347 image_builder.add_cols(width);
348 image_builder.add_data(image_offset);
349
350 builder.CheckOk(builder.Send(image_builder.Finish()));
351 }
James Kuszmaul345b0812024-04-18 13:07:45 -0500352 manual_restart_handle_->Schedule(event_loop_->monotonic_now() +
353 std::chrono::seconds(10));
Tyler Chatowb3850c12020-02-26 20:55:48 -0800354}
355
356void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
357 connections_.erase(sock);
358}
359
360Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
361 : sock_(sock), server_(server) {
362 GError *error = NULL;
363
364 // Build pipeline to read data from application into pipeline, place in
365 // webrtcbin group, and stream.
366
367 pipeline_ = gst_parse_launch(
368 // aggregate-mode should be zero-latency but this drops the stream on
369 // bitrate spikes for some reason - probably the weak CPU on the pi.
370 absl::StrFormat(
371 "webrtcbin name=webrtcbin appsrc "
372 "name=appsrc block=false "
373 "is-live=true "
374 "format=3 max-buffers=0 leaky-type=2 "
375 "caps=video/x-raw,width=%d,height=%d,format=YUY2 ! videoconvert ! "
376 "x264enc bitrate=%d speed-preset=ultrafast "
377 "tune=zerolatency key-int-max=15 sliced-threads=true ! "
378 "video/x-h264,profile=constrained-baseline ! h264parse ! "
379 "rtph264pay "
380 "config-interval=-1 name=payloader aggregate-mode=none ! "
381 "application/"
382 "x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 !"
383 "webrtcbin. ",
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700384 absl::GetFlag(FLAGS_width), absl::GetFlag(FLAGS_height),
385 absl::GetFlag(FLAGS_bitrate) / 1000)
Tyler Chatowb3850c12020-02-26 20:55:48 -0800386 .c_str(),
387 &error);
388
389 if (error != NULL) {
390 LOG(FATAL) << "Could not create WebRTC pipeline: " << error->message;
391 }
392
393 webrtcbin_ = gst_bin_get_by_name(GST_BIN(pipeline_), "webrtcbin");
394 if (webrtcbin_ == NULL) {
395 LOG(FATAL) << "Could not initialize webrtcbin";
396 }
397
398 appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsrc");
399 if (appsrc_ == NULL) {
400 LOG(FATAL) << "Could not initialize appsrc";
401 }
402
403 {
404 GArray *transceivers;
405 g_signal_emit_by_name(webrtcbin_, "get-transceivers", &transceivers);
406 if (transceivers == NULL || transceivers->len <= 0) {
407 LOG(FATAL) << "Could not initialize transceivers";
408 }
409
410 GstWebRTCRTPTransceiver *trans =
411 g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0);
412 g_object_set(trans, "direction",
413 GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY, nullptr);
414
415 g_array_unref(transceivers);
416 }
417
418 {
419 GstObject *ice = nullptr;
420 g_object_get(G_OBJECT(webrtcbin_), "ice-agent", &ice, nullptr);
Austin Schuh6bdcc372024-06-27 14:49:11 -0700421 CHECK(ice != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800422
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700423 g_object_set(ice, "min-rtp-port", absl::GetFlag(FLAGS_min_port),
424 "max-rtp-port", absl::GetFlag(FLAGS_max_port), nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800425
426 // We don't need upnp on a local network.
427 {
428 GstObject *nice = nullptr;
429 g_object_get(ice, "agent", &nice, nullptr);
Austin Schuh6bdcc372024-06-27 14:49:11 -0700430 CHECK(nice != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800431
432 g_object_set(nice, "upnp", false, nullptr);
433 g_object_unref(nice);
434 }
435
436 gst_object_unref(ice);
437 }
438
439 g_signal_connect(webrtcbin_, "on-negotiation-needed",
440 G_CALLBACK(Connection::OnNegotiationNeededCallback),
441 static_cast<gpointer>(this));
442
443 g_signal_connect(webrtcbin_, "on-ice-candidate",
444 G_CALLBACK(Connection::OnIceCandidateCallback),
445 static_cast<gpointer>(this));
446
447 gst_element_set_state(pipeline_, GST_STATE_READY);
448 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
449}
450
451Connection::~Connection() {
452 if (pipeline_ != NULL) {
453 gst_element_set_state(pipeline_, GST_STATE_NULL);
454
455 gst_object_unref(GST_OBJECT(webrtcbin_));
456 gst_object_unref(GST_OBJECT(pipeline_));
457 gst_object_unref(GST_OBJECT(appsrc_));
458 }
459}
460
461void Connection::OnSample(GstSample *sample) {
462 GstFlowReturn response =
463 gst_app_src_push_sample(GST_APP_SRC(appsrc_), sample);
464 if (response != GST_FLOW_OK) {
465 LOG(WARNING) << "Sample pushed, did not receive OK";
466 }
467
468 // Since the stream is already running (the camera turns on with
469 // image_streamer) we need to tell the new appsrc where
470 // we are starting in the stream so it can catch up immediately.
471 if (first_sample_) {
472 GstPad *src = gst_element_get_static_pad(appsrc_, "src");
473 if (src == NULL) {
474 return;
475 }
476
477 GstSegment *segment = gst_sample_get_segment(sample);
478 GstBuffer *buffer = gst_sample_get_buffer(sample);
479
480 guint64 offset = gst_segment_to_running_time(segment, GST_FORMAT_TIME,
481 GST_BUFFER_PTS(buffer));
482 LOG(INFO) << "Fixing offset " << offset;
483 gst_pad_set_offset(src, -offset);
484
485 gst_object_unref(GST_OBJECT(src));
486 first_sample_ = false;
487 }
488}
489
490void Connection::OnOfferCreated(GstPromise *promise) {
491 LOG(INFO) << "OnOfferCreated";
492
493 GstWebRTCSessionDescription *offer = NULL;
494 gst_structure_get(gst_promise_get_reply(promise), "offer",
495 GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
496 gst_promise_unref(promise);
497
498 {
499 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)>
500 local_desc_promise(gst_promise_new(), &gst_promise_unref);
501 g_signal_emit_by_name(webrtcbin_, "set-local-description", offer,
502 local_desc_promise.get());
503 gst_promise_interrupt(local_desc_promise.get());
504 }
505
506 GstSDPMessage *sdp_msg = offer->sdp;
507 std::string sdp_str(gst_sdp_message_as_text(sdp_msg));
508
509 LOG(INFO) << "Negotiation offer created:\n" << sdp_str;
510
511 flatbuffers::FlatBufferBuilder fbb(512);
512 flatbuffers::Offset<WebSocketSdp> sdp_fb =
513 CreateWebSocketSdpDirect(fbb, SdpType::OFFER, sdp_str.c_str());
514 flatbuffers::Offset<WebSocketMessage> answer_message =
515 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
516 fbb.Finish(answer_message);
517
518 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
519}
520
521void Connection::OnNegotiationNeeded() {
522 LOG(INFO) << "OnNegotiationNeeded";
523
524 GstPromise *promise;
525 promise = gst_promise_new_with_change_func(Connection::OnOfferCreatedCallback,
526 static_cast<gpointer>(this), NULL);
527 g_signal_emit_by_name(G_OBJECT(webrtcbin_), "create-offer", NULL, promise);
528}
529
530void Connection::OnIceCandidate(guint mline_index, gchar *candidate) {
531 LOG(INFO) << "OnIceCandidate";
532
533 flatbuffers::FlatBufferBuilder fbb(512);
534
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700535 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
536 fbb.CreateString("video0");
537 flatbuffers::Offset<flatbuffers::String> candidate_offset =
538 fbb.CreateString(static_cast<char *>(candidate));
539
Tyler Chatowb3850c12020-02-26 20:55:48 -0800540 auto ice_fb_builder = WebSocketIce::Builder(fbb);
541 ice_fb_builder.add_sdp_m_line_index(mline_index);
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700542 ice_fb_builder.add_sdp_mid(sdp_mid_offset);
543 ice_fb_builder.add_candidate(candidate_offset);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800544 flatbuffers::Offset<WebSocketIce> ice_fb = ice_fb_builder.Finish();
545
546 flatbuffers::Offset<WebSocketMessage> ice_message =
547 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
548 fbb.Finish(ice_message);
549
550 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
551
552 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
553 candidate);
554}
555
556void Connection::HandleWebSocketData(const uint8_t *data, size_t /* size*/) {
557 LOG(INFO) << "HandleWebSocketData";
558
559 const WebSocketMessage *message =
560 flatbuffers::GetRoot<WebSocketMessage>(data);
561
562 switch (message->payload_type()) {
563 case Payload::WebSocketSdp: {
564 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
565 if (offer->type() != SdpType::ANSWER) {
566 LOG(WARNING) << "Expected SDP message type \"answer\"";
567 break;
568 }
569 const flatbuffers::String *sdp_string = offer->payload();
570
571 LOG(INFO) << "Received SDP:\n" << sdp_string->c_str();
572
573 GstSDPMessage *sdp;
574 GstSDPResult status = gst_sdp_message_new(&sdp);
575 if (status != GST_SDP_OK) {
576 LOG(WARNING) << "Could not create SDP message";
577 break;
578 }
579
580 status = gst_sdp_message_parse_buffer((const guint8 *)sdp_string->c_str(),
581 sdp_string->size(), sdp);
582
583 if (status != GST_SDP_OK) {
584 LOG(WARNING) << "Could not parse SDP string";
585 break;
586 }
587
588 std::unique_ptr<GstWebRTCSessionDescription,
589 decltype(&gst_webrtc_session_description_free)>
590 answer(gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER,
591 sdp),
592 &gst_webrtc_session_description_free);
593 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> promise(
594 gst_promise_new(), &gst_promise_unref);
595 g_signal_emit_by_name(webrtcbin_, "set-remote-description", answer.get(),
596 promise.get());
597 gst_promise_interrupt(promise.get());
598
599 break;
600 }
601 case Payload::WebSocketIce: {
602 const WebSocketIce *ice = message->payload_as_WebSocketIce();
603 if (!ice->has_candidate() || ice->candidate()->size() == 0) {
604 LOG(WARNING) << "Received ICE message without candidate";
605 break;
606 }
607
608 const gchar *candidate =
609 static_cast<const gchar *>(ice->candidate()->c_str());
610 guint mline_index = ice->sdp_m_line_index();
611
612 LOG(INFO) << "Received ICE candidate with mline index " << mline_index
613 << "; candidate: " << candidate;
614
615 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
616 candidate);
617
618 break;
619 }
620 default:
621 break;
622 }
623}
624
Tyler Chatowb3850c12020-02-26 20:55:48 -0800625int main(int argc, char **argv) {
626 aos::InitGoogle(&argc, &argv);
627
628 findEmbeddedContent("");
629
630 std::string openssl_env = "OPENSSL_CONF=\"\"";
631 putenv(const_cast<char *>(openssl_env.c_str()));
632
Tyler Chatowb3850c12020-02-26 20:55:48 -0800633 gst_init(&argc, &argv);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800634
635 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700636 aos::configuration::ReadConfig(absl::GetFlag(FLAGS_config));
Tyler Chatowb3850c12020-02-26 20:55:48 -0800637 aos::ShmEventLoop event_loop(&config.message());
638
639 {
640 aos::GlibMainLoop main_loop(&event_loop);
641
642 seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
643 new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
644
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700645 LOG(INFO) << "Serving from " << absl::GetFlag(FLAGS_data_dir);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800646
647 auto websocket_handler =
648 std::make_shared<WebsocketHandler>(&event_loop, &server);
649 server.addWebSocketHandler("/ws", websocket_handler);
650
Austin Schuh99f7c6a2024-06-25 22:07:44 -0700651 server.startListening(absl::GetFlag(FLAGS_streaming_port));
652 server.setStaticPath(absl::GetFlag(FLAGS_data_dir).c_str());
Tyler Chatowb3850c12020-02-26 20:55:48 -0800653
654 aos::internal::EPoll *epoll = event_loop.epoll();
655
656 epoll->OnReadable(server.fd(), [&server] {
657 CHECK(::seasocks::Server::PollResult::Continue == server.poll(0));
658 });
659
660 event_loop.Run();
661
662 epoll->DeleteFd(server.fd());
663 server.terminate();
664 }
665
666 gst_deinit();
667
668 return 0;
669}