blob: b89adc0a002f4389847c7fc084cfca2a9d23c4f2 [file] [log] [blame]
Tyler Chatowb3850c12020-02-26 20:55:48 -08001#define GST_USE_UNSTABLE_API
2#define GST_DISABLE_REGISTRY 1
3
4#include <glib-unix.h>
5#include <glib.h>
6#include <gst/app/app.h>
7#include <gst/gst.h>
8#include <gst/sdp/sdp.h>
9#include <gst/webrtc/icetransport.h>
10#include <gst/webrtc/webrtc.h>
11#include <sys/stat.h>
12#include <sys/types.h>
13
14#include <map>
15#include <thread>
16
17#include "absl/strings/str_format.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070018#include "flatbuffers/flatbuffers.h"
19#include "gflags/gflags.h"
20#include "glog/logging.h"
21
Tyler Chatowb3850c12020-02-26 20:55:48 -080022#include "aos/events/glib_main_loop.h"
23#include "aos/events/shm_event_loop.h"
24#include "aos/init.h"
25#include "aos/network/web_proxy_generated.h"
26#include "aos/seasocks/seasocks_logger.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080027#include "frc971/vision/vision_generated.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080028#include "internal/Embedded.h"
29#include "seasocks/Server.h"
30#include "seasocks/StringUtil.h"
31#include "seasocks/WebSocket.h"
32
milind-ub0773e92023-02-05 15:57:43 -080033DEFINE_string(config, "aos_config.json",
Tyler Chatowb3850c12020-02-26 20:55:48 -080034 "Name of the config file to replay using.");
Tyler Chatow39b6a322022-04-15 00:03:58 -070035DEFINE_string(device, "/dev/video0",
36 "Camera fd. Ignored if reading from channel");
Tyler Chatowb3850c12020-02-26 20:55:48 -080037DEFINE_string(data_dir, "image_streamer_www",
38 "Directory to serve data files from");
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070039DEFINE_bool(publish_images, true,
40 "If true, publish images read from v4l2 to /camera.");
Tyler Chatowb3850c12020-02-26 20:55:48 -080041DEFINE_int32(width, 400, "Image width");
42DEFINE_int32(height, 300, "Image height");
43DEFINE_int32(framerate, 25, "Framerate (FPS)");
44DEFINE_int32(brightness, 50, "Camera brightness");
45DEFINE_int32(exposure, 300, "Manual exposure");
46DEFINE_int32(bitrate, 500000, "H264 encode bitrate");
milind-ub0773e92023-02-05 15:57:43 -080047DEFINE_int32(streaming_port, 1180, "Port to stream images on with seasocks");
Tyler Chatowb3850c12020-02-26 20:55:48 -080048DEFINE_int32(min_port, 5800, "Min rtp port");
49DEFINE_int32(max_port, 5810, "Max rtp port");
Tyler Chatow39b6a322022-04-15 00:03:58 -070050DEFINE_string(listen_on, "",
51 "Channel on which to receive frames from. Used in place of "
52 "internal V4L2 reader. Note: width and height MUST match the "
53 "expected size of channel images.");
Tyler Chatowb3850c12020-02-26 20:55:48 -080054
55class Connection;
56
57using aos::web_proxy::Payload;
58using aos::web_proxy::SdpType;
59using aos::web_proxy::WebSocketIce;
60using aos::web_proxy::WebSocketMessage;
61using aos::web_proxy::WebSocketSdp;
62
Tyler Chatow39b6a322022-04-15 00:03:58 -070063class GstSampleSource {
64 public:
65 GstSampleSource() = default;
66
67 virtual ~GstSampleSource() = default;
68
69 private:
70 GstSampleSource(const GstSampleSource &) = delete;
71};
72
73class V4L2Source : public GstSampleSource {
74 public:
75 V4L2Source(std::function<void(GstSample *)> callback)
76 : callback_(std::move(callback)) {
77 GError *error = NULL;
78
79 // Create pipeline to read from camera, pack into rtp stream, and dump
80 // stream to callback. v4l2 device should already be configured with correct
81 // bitrate from v4l2-ctl. do-timestamp marks the time the frame was taken to
82 // track when it should be dropped under latency.
83
84 // With the Pi's hardware encoder, we can encode and package the stream once
85 // and the clients will jump in at any point unsynchronized. With the stream
86 // from x264enc this doesn't seem to work. For now, just reencode for each
87 // client since we don't expect more than 1 or 2.
88
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070089 std::string exposure;
90 if (FLAGS_exposure > 0) {
91 exposure = absl::StrFormat(",auto_exposure=1,exposure_time_absolute=%d",
92 FLAGS_exposure);
93 }
94
Tyler Chatow39b6a322022-04-15 00:03:58 -070095 pipeline_ = gst_parse_launch(
96 absl::StrFormat("v4l2src device=%s do-timestamp=true "
Austin Schuhf5dbe2c2024-04-06 16:10:24 -070097 "extra-controls=\"c,brightness=%d%s\" ! "
Tyler Chatow39b6a322022-04-15 00:03:58 -070098 "video/x-raw,width=%d,height=%d,framerate=%d/"
99 "1,format=YUY2 ! appsink "
100 "name=appsink "
101 "emit-signals=true sync=false async=false "
102 "caps=video/x-raw,format=YUY2",
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700103 FLAGS_device, FLAGS_brightness, exposure, FLAGS_width,
104 FLAGS_height, FLAGS_framerate)
Tyler Chatow39b6a322022-04-15 00:03:58 -0700105 .c_str(),
106 &error);
107
108 if (error != NULL) {
109 LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
110 }
111
112 appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
113 if (appsink_ == NULL) {
114 LOG(FATAL) << "Could not get appsink";
115 }
116
117 g_signal_connect(appsink_, "new-sample",
118 G_CALLBACK(V4L2Source::OnSampleCallback),
119 static_cast<gpointer>(this));
120
121 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
122 }
123
124 ~V4L2Source() {
125 if (pipeline_ != NULL) {
126 gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
127 gst_object_unref(GST_OBJECT(pipeline_));
128 gst_object_unref(GST_OBJECT(appsink_));
129 }
130 }
131
132 private:
133 static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
134 static_cast<V4L2Source *>(user_data)->OnSample();
135 return GST_FLOW_OK;
136 }
137
138 void OnSample() {
139 GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
140 if (sample == NULL) {
141 LOG(WARNING) << "Received null sample";
142 return;
143 }
144 callback_(sample);
145 gst_sample_unref(sample);
146 }
147
148 GstElement *pipeline_;
149 GstElement *appsink_;
150
151 std::function<void(GstSample *)> callback_;
152};
153
154class ChannelSource : public GstSampleSource {
155 public:
156 ChannelSource(aos::ShmEventLoop *event_loop,
157 std::function<void(GstSample *)> callback)
158 : callback_(std::move(callback)) {
159 event_loop->MakeWatcher(
160 FLAGS_listen_on,
161 [this](const frc971::vision::CameraImage &image) { OnImage(image); });
162 }
163
164 private:
165 void OnImage(const frc971::vision::CameraImage &image) {
166 if (!image.has_rows() || !image.has_cols() || !image.has_data()) {
167 VLOG(2) << "Skipping CameraImage with no data";
168 return;
169 }
170 CHECK_EQ(image.rows(), FLAGS_height);
171 CHECK_EQ(image.cols(), FLAGS_width);
172
173 GBytes *bytes = g_bytes_new(image.data()->data(), image.data()->size());
174 GstBuffer *buffer = gst_buffer_new_wrapped_bytes(bytes);
175
176 GST_BUFFER_PTS(buffer) = image.monotonic_timestamp_ns();
177
Austin Schuh6bdcc372024-06-27 14:49:11 -0700178 GstCaps *caps = gst_caps_new_simple(
Tyler Chatow39b6a322022-04-15 00:03:58 -0700179 "video/x-raw", "width", G_TYPE_INT, image.cols(), "height", G_TYPE_INT,
Austin Schuh6bdcc372024-06-27 14:49:11 -0700180 image.rows(), "format", G_TYPE_STRING, "YUY2", nullptr);
181 CHECK(caps != nullptr);
Tyler Chatow39b6a322022-04-15 00:03:58 -0700182
183 GstSample *sample = gst_sample_new(buffer, caps, nullptr, nullptr);
184
185 callback_(sample);
186
187 gst_sample_unref(sample);
188 gst_caps_unref(caps);
189 gst_buffer_unref(buffer);
190 g_bytes_unref(bytes);
191 }
192
193 std::function<void(GstSample *)> callback_;
194};
195
Tyler Chatowb3850c12020-02-26 20:55:48 -0800196// Basic class that handles receiving new websocket connections. Creates a new
197// Connection to manage the rest of the negotiation and data passing. When the
198// websocket closes, it deletes the Connection.
199class WebsocketHandler : public ::seasocks::WebSocket::Handler {
200 public:
201 WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server);
Tyler Chatow39b6a322022-04-15 00:03:58 -0700202 ~WebsocketHandler() override = default;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800203
204 void onConnect(::seasocks::WebSocket *sock) override;
205 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
206 size_t size) override;
207 void onDisconnect(::seasocks::WebSocket *sock) override;
208
209 private:
Tyler Chatow39b6a322022-04-15 00:03:58 -0700210 void OnSample(GstSample *sample);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800211
James Kuszmaul345b0812024-04-18 13:07:45 -0500212 aos::ShmEventLoop *event_loop_;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800213 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
214 ::seasocks::Server *server_;
Tyler Chatow39b6a322022-04-15 00:03:58 -0700215 std::unique_ptr<GstSampleSource> source_;
James Kuszmaul345b0812024-04-18 13:07:45 -0500216 aos::TimerHandler *manual_restart_handle_;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800217
218 aos::Sender<frc971::vision::CameraImage> sender_;
219};
220
221// Seasocks requires that sends happen on the correct thread. This class takes a
222// detached buffer to send on a specific websocket connection and sends it when
223// seasocks is ready.
224class UpdateData : public ::seasocks::Server::Runnable {
225 public:
226 UpdateData(::seasocks::WebSocket *websocket,
227 flatbuffers::DetachedBuffer &&buffer)
228 : sock_(websocket), buffer_(std::move(buffer)) {}
229 ~UpdateData() override = default;
230 UpdateData(const UpdateData &) = delete;
231 UpdateData &operator=(const UpdateData &) = delete;
232
233 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
234
235 private:
236 ::seasocks::WebSocket *sock_;
237 const flatbuffers::DetachedBuffer buffer_;
238};
239
240class Connection {
241 public:
242 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
243
244 ~Connection();
245
246 void HandleWebSocketData(const uint8_t *data, size_t size);
247
248 void OnSample(GstSample *sample);
249
250 private:
251 static void OnOfferCreatedCallback(GstPromise *promise, gpointer user_data) {
252 static_cast<Connection *>(user_data)->OnOfferCreated(promise);
253 }
254
255 static void OnNegotiationNeededCallback(GstElement *, gpointer user_data) {
256 static_cast<Connection *>(user_data)->OnNegotiationNeeded();
257 }
258
259 static void OnIceCandidateCallback(GstElement *, guint mline_index,
260 gchar *candidate, gpointer user_data) {
261 static_cast<Connection *>(user_data)->OnIceCandidate(mline_index,
262 candidate);
263 }
264
265 void OnOfferCreated(GstPromise *promise);
266 void OnNegotiationNeeded();
267 void OnIceCandidate(guint mline_index, gchar *candidate);
268
269 ::seasocks::WebSocket *sock_;
270 ::seasocks::Server *server_;
271
272 GstElement *pipeline_;
273 GstElement *webrtcbin_;
274 GstElement *appsrc_;
275
276 bool first_sample_ = true;
277};
278
279WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop,
280 ::seasocks::Server *server)
James Kuszmaul345b0812024-04-18 13:07:45 -0500281 : event_loop_(event_loop),
282 server_(server),
283 manual_restart_handle_(
284 event_loop_->AddTimer([this]() { event_loop_->Exit(); })) {
Tyler Chatow39b6a322022-04-15 00:03:58 -0700285 if (FLAGS_listen_on.empty()) {
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700286 if (FLAGS_publish_images) {
287 sender_ = event_loop->MakeSender<frc971::vision::CameraImage>("/camera");
288 }
Tyler Chatow39b6a322022-04-15 00:03:58 -0700289 source_ =
290 std::make_unique<V4L2Source>([this](auto sample) { OnSample(sample); });
291 } else {
292 source_ = std::make_unique<ChannelSource>(
293 event_loop, [this](auto sample) { OnSample(sample); });
Tyler Chatowb3850c12020-02-26 20:55:48 -0800294 }
James Kuszmaul345b0812024-04-18 13:07:45 -0500295 event_loop_->OnRun([this]() {
296 manual_restart_handle_->Schedule(event_loop_->monotonic_now() +
297 std::chrono::seconds(10));
298 });
Tyler Chatowb3850c12020-02-26 20:55:48 -0800299}
300
301void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
302 std::unique_ptr<Connection> conn =
303 std::make_unique<Connection>(sock, server_);
304 connections_.insert({sock, std::move(conn)});
305}
306
307void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
308 size_t size) {
309 connections_[sock]->HandleWebSocketData(data, size);
310}
311
Tyler Chatow39b6a322022-04-15 00:03:58 -0700312void WebsocketHandler::OnSample(GstSample *sample) {
Tyler Chatowb3850c12020-02-26 20:55:48 -0800313 for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) {
314 iter->second->OnSample(sample);
315 }
316
Tyler Chatow39b6a322022-04-15 00:03:58 -0700317 if (sender_.valid()) {
Austin Schuh6bdcc372024-06-27 14:49:11 -0700318 const GstCaps *caps = gst_sample_get_caps(sample);
319 CHECK(caps != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800320 CHECK_GT(gst_caps_get_size(caps), 0U);
321 const GstStructure *str = gst_caps_get_structure(caps, 0);
322
323 gint width;
324 gint height;
325
326 CHECK(gst_structure_get_int(str, "width", &width));
327 CHECK(gst_structure_get_int(str, "height", &height));
328
Austin Schuh6bdcc372024-06-27 14:49:11 -0700329 GstBuffer *buffer = gst_sample_get_buffer(sample);
330 CHECK(buffer != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800331
332 const gsize size = gst_buffer_get_size(buffer);
333
334 auto builder = sender_.MakeBuilder();
335
336 uint8_t *image_data;
337 auto image_offset =
338 builder.fbb()->CreateUninitializedVector(size, &image_data);
339 gst_buffer_extract(buffer, 0, image_data, size);
340
341 auto image_builder = builder.MakeBuilder<frc971::vision::CameraImage>();
342 image_builder.add_rows(height);
343 image_builder.add_cols(width);
344 image_builder.add_data(image_offset);
345
346 builder.CheckOk(builder.Send(image_builder.Finish()));
347 }
James Kuszmaul345b0812024-04-18 13:07:45 -0500348 manual_restart_handle_->Schedule(event_loop_->monotonic_now() +
349 std::chrono::seconds(10));
Tyler Chatowb3850c12020-02-26 20:55:48 -0800350}
351
352void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
353 connections_.erase(sock);
354}
355
356Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
357 : sock_(sock), server_(server) {
358 GError *error = NULL;
359
360 // Build pipeline to read data from application into pipeline, place in
361 // webrtcbin group, and stream.
362
363 pipeline_ = gst_parse_launch(
364 // aggregate-mode should be zero-latency but this drops the stream on
365 // bitrate spikes for some reason - probably the weak CPU on the pi.
366 absl::StrFormat(
367 "webrtcbin name=webrtcbin appsrc "
368 "name=appsrc block=false "
369 "is-live=true "
370 "format=3 max-buffers=0 leaky-type=2 "
371 "caps=video/x-raw,width=%d,height=%d,format=YUY2 ! videoconvert ! "
372 "x264enc bitrate=%d speed-preset=ultrafast "
373 "tune=zerolatency key-int-max=15 sliced-threads=true ! "
374 "video/x-h264,profile=constrained-baseline ! h264parse ! "
375 "rtph264pay "
376 "config-interval=-1 name=payloader aggregate-mode=none ! "
377 "application/"
378 "x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 !"
379 "webrtcbin. ",
380 FLAGS_width, FLAGS_height, FLAGS_bitrate / 1000)
381 .c_str(),
382 &error);
383
384 if (error != NULL) {
385 LOG(FATAL) << "Could not create WebRTC pipeline: " << error->message;
386 }
387
388 webrtcbin_ = gst_bin_get_by_name(GST_BIN(pipeline_), "webrtcbin");
389 if (webrtcbin_ == NULL) {
390 LOG(FATAL) << "Could not initialize webrtcbin";
391 }
392
393 appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsrc");
394 if (appsrc_ == NULL) {
395 LOG(FATAL) << "Could not initialize appsrc";
396 }
397
398 {
399 GArray *transceivers;
400 g_signal_emit_by_name(webrtcbin_, "get-transceivers", &transceivers);
401 if (transceivers == NULL || transceivers->len <= 0) {
402 LOG(FATAL) << "Could not initialize transceivers";
403 }
404
405 GstWebRTCRTPTransceiver *trans =
406 g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0);
407 g_object_set(trans, "direction",
408 GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY, nullptr);
409
410 g_array_unref(transceivers);
411 }
412
413 {
414 GstObject *ice = nullptr;
415 g_object_get(G_OBJECT(webrtcbin_), "ice-agent", &ice, nullptr);
Austin Schuh6bdcc372024-06-27 14:49:11 -0700416 CHECK(ice != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800417
418 g_object_set(ice, "min-rtp-port", FLAGS_min_port, "max-rtp-port",
419 FLAGS_max_port, nullptr);
420
421 // We don't need upnp on a local network.
422 {
423 GstObject *nice = nullptr;
424 g_object_get(ice, "agent", &nice, nullptr);
Austin Schuh6bdcc372024-06-27 14:49:11 -0700425 CHECK(nice != nullptr);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800426
427 g_object_set(nice, "upnp", false, nullptr);
428 g_object_unref(nice);
429 }
430
431 gst_object_unref(ice);
432 }
433
434 g_signal_connect(webrtcbin_, "on-negotiation-needed",
435 G_CALLBACK(Connection::OnNegotiationNeededCallback),
436 static_cast<gpointer>(this));
437
438 g_signal_connect(webrtcbin_, "on-ice-candidate",
439 G_CALLBACK(Connection::OnIceCandidateCallback),
440 static_cast<gpointer>(this));
441
442 gst_element_set_state(pipeline_, GST_STATE_READY);
443 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
444}
445
446Connection::~Connection() {
447 if (pipeline_ != NULL) {
448 gst_element_set_state(pipeline_, GST_STATE_NULL);
449
450 gst_object_unref(GST_OBJECT(webrtcbin_));
451 gst_object_unref(GST_OBJECT(pipeline_));
452 gst_object_unref(GST_OBJECT(appsrc_));
453 }
454}
455
456void Connection::OnSample(GstSample *sample) {
457 GstFlowReturn response =
458 gst_app_src_push_sample(GST_APP_SRC(appsrc_), sample);
459 if (response != GST_FLOW_OK) {
460 LOG(WARNING) << "Sample pushed, did not receive OK";
461 }
462
463 // Since the stream is already running (the camera turns on with
464 // image_streamer) we need to tell the new appsrc where
465 // we are starting in the stream so it can catch up immediately.
466 if (first_sample_) {
467 GstPad *src = gst_element_get_static_pad(appsrc_, "src");
468 if (src == NULL) {
469 return;
470 }
471
472 GstSegment *segment = gst_sample_get_segment(sample);
473 GstBuffer *buffer = gst_sample_get_buffer(sample);
474
475 guint64 offset = gst_segment_to_running_time(segment, GST_FORMAT_TIME,
476 GST_BUFFER_PTS(buffer));
477 LOG(INFO) << "Fixing offset " << offset;
478 gst_pad_set_offset(src, -offset);
479
480 gst_object_unref(GST_OBJECT(src));
481 first_sample_ = false;
482 }
483}
484
485void Connection::OnOfferCreated(GstPromise *promise) {
486 LOG(INFO) << "OnOfferCreated";
487
488 GstWebRTCSessionDescription *offer = NULL;
489 gst_structure_get(gst_promise_get_reply(promise), "offer",
490 GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
491 gst_promise_unref(promise);
492
493 {
494 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)>
495 local_desc_promise(gst_promise_new(), &gst_promise_unref);
496 g_signal_emit_by_name(webrtcbin_, "set-local-description", offer,
497 local_desc_promise.get());
498 gst_promise_interrupt(local_desc_promise.get());
499 }
500
501 GstSDPMessage *sdp_msg = offer->sdp;
502 std::string sdp_str(gst_sdp_message_as_text(sdp_msg));
503
504 LOG(INFO) << "Negotiation offer created:\n" << sdp_str;
505
506 flatbuffers::FlatBufferBuilder fbb(512);
507 flatbuffers::Offset<WebSocketSdp> sdp_fb =
508 CreateWebSocketSdpDirect(fbb, SdpType::OFFER, sdp_str.c_str());
509 flatbuffers::Offset<WebSocketMessage> answer_message =
510 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
511 fbb.Finish(answer_message);
512
513 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
514}
515
516void Connection::OnNegotiationNeeded() {
517 LOG(INFO) << "OnNegotiationNeeded";
518
519 GstPromise *promise;
520 promise = gst_promise_new_with_change_func(Connection::OnOfferCreatedCallback,
521 static_cast<gpointer>(this), NULL);
522 g_signal_emit_by_name(G_OBJECT(webrtcbin_), "create-offer", NULL, promise);
523}
524
525void Connection::OnIceCandidate(guint mline_index, gchar *candidate) {
526 LOG(INFO) << "OnIceCandidate";
527
528 flatbuffers::FlatBufferBuilder fbb(512);
529
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700530 flatbuffers::Offset<flatbuffers::String> sdp_mid_offset =
531 fbb.CreateString("video0");
532 flatbuffers::Offset<flatbuffers::String> candidate_offset =
533 fbb.CreateString(static_cast<char *>(candidate));
534
Tyler Chatowb3850c12020-02-26 20:55:48 -0800535 auto ice_fb_builder = WebSocketIce::Builder(fbb);
536 ice_fb_builder.add_sdp_m_line_index(mline_index);
Austin Schuhf5dbe2c2024-04-06 16:10:24 -0700537 ice_fb_builder.add_sdp_mid(sdp_mid_offset);
538 ice_fb_builder.add_candidate(candidate_offset);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800539 flatbuffers::Offset<WebSocketIce> ice_fb = ice_fb_builder.Finish();
540
541 flatbuffers::Offset<WebSocketMessage> ice_message =
542 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
543 fbb.Finish(ice_message);
544
545 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
546
547 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
548 candidate);
549}
550
551void Connection::HandleWebSocketData(const uint8_t *data, size_t /* size*/) {
552 LOG(INFO) << "HandleWebSocketData";
553
554 const WebSocketMessage *message =
555 flatbuffers::GetRoot<WebSocketMessage>(data);
556
557 switch (message->payload_type()) {
558 case Payload::WebSocketSdp: {
559 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
560 if (offer->type() != SdpType::ANSWER) {
561 LOG(WARNING) << "Expected SDP message type \"answer\"";
562 break;
563 }
564 const flatbuffers::String *sdp_string = offer->payload();
565
566 LOG(INFO) << "Received SDP:\n" << sdp_string->c_str();
567
568 GstSDPMessage *sdp;
569 GstSDPResult status = gst_sdp_message_new(&sdp);
570 if (status != GST_SDP_OK) {
571 LOG(WARNING) << "Could not create SDP message";
572 break;
573 }
574
575 status = gst_sdp_message_parse_buffer((const guint8 *)sdp_string->c_str(),
576 sdp_string->size(), sdp);
577
578 if (status != GST_SDP_OK) {
579 LOG(WARNING) << "Could not parse SDP string";
580 break;
581 }
582
583 std::unique_ptr<GstWebRTCSessionDescription,
584 decltype(&gst_webrtc_session_description_free)>
585 answer(gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER,
586 sdp),
587 &gst_webrtc_session_description_free);
588 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> promise(
589 gst_promise_new(), &gst_promise_unref);
590 g_signal_emit_by_name(webrtcbin_, "set-remote-description", answer.get(),
591 promise.get());
592 gst_promise_interrupt(promise.get());
593
594 break;
595 }
596 case Payload::WebSocketIce: {
597 const WebSocketIce *ice = message->payload_as_WebSocketIce();
598 if (!ice->has_candidate() || ice->candidate()->size() == 0) {
599 LOG(WARNING) << "Received ICE message without candidate";
600 break;
601 }
602
603 const gchar *candidate =
604 static_cast<const gchar *>(ice->candidate()->c_str());
605 guint mline_index = ice->sdp_m_line_index();
606
607 LOG(INFO) << "Received ICE candidate with mline index " << mline_index
608 << "; candidate: " << candidate;
609
610 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
611 candidate);
612
613 break;
614 }
615 default:
616 break;
617 }
618}
619
Tyler Chatowb3850c12020-02-26 20:55:48 -0800620int main(int argc, char **argv) {
621 aos::InitGoogle(&argc, &argv);
622
623 findEmbeddedContent("");
624
625 std::string openssl_env = "OPENSSL_CONF=\"\"";
626 putenv(const_cast<char *>(openssl_env.c_str()));
627
Tyler Chatowb3850c12020-02-26 20:55:48 -0800628 gst_init(&argc, &argv);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800629
630 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
631 aos::configuration::ReadConfig(FLAGS_config);
632 aos::ShmEventLoop event_loop(&config.message());
633
634 {
635 aos::GlibMainLoop main_loop(&event_loop);
636
637 seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
638 new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
639
640 LOG(INFO) << "Serving from " << FLAGS_data_dir;
641
642 auto websocket_handler =
643 std::make_shared<WebsocketHandler>(&event_loop, &server);
644 server.addWebSocketHandler("/ws", websocket_handler);
645
milind-ub0773e92023-02-05 15:57:43 -0800646 server.startListening(FLAGS_streaming_port);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800647 server.setStaticPath(FLAGS_data_dir.c_str());
648
649 aos::internal::EPoll *epoll = event_loop.epoll();
650
651 epoll->OnReadable(server.fd(), [&server] {
652 CHECK(::seasocks::Server::PollResult::Continue == server.poll(0));
653 });
654
655 event_loop.Run();
656
657 epoll->DeleteFd(server.fd());
658 server.terminate();
659 }
660
661 gst_deinit();
662
663 return 0;
664}