blob: 7f85b3fbe0bf56501a680906fdc68c6ea39b809b [file] [log] [blame]
Tyler Chatowb3850c12020-02-26 20:55:48 -08001#define GST_USE_UNSTABLE_API
2#define GST_DISABLE_REGISTRY 1
3
4#include <glib-unix.h>
5#include <glib.h>
6#include <gst/app/app.h>
7#include <gst/gst.h>
8#include <gst/sdp/sdp.h>
9#include <gst/webrtc/icetransport.h>
10#include <gst/webrtc/webrtc.h>
11#include <sys/stat.h>
12#include <sys/types.h>
13
14#include <map>
15#include <thread>
16
17#include "absl/strings/str_format.h"
Philipp Schrader790cb542023-07-05 21:06:52 -070018#include "flatbuffers/flatbuffers.h"
19#include "gflags/gflags.h"
20#include "glog/logging.h"
21
Tyler Chatowb3850c12020-02-26 20:55:48 -080022#include "aos/events/glib_main_loop.h"
23#include "aos/events/shm_event_loop.h"
24#include "aos/init.h"
25#include "aos/network/web_proxy_generated.h"
26#include "aos/seasocks/seasocks_logger.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080027#include "frc971/vision/vision_generated.h"
Tyler Chatowb3850c12020-02-26 20:55:48 -080028#include "internal/Embedded.h"
29#include "seasocks/Server.h"
30#include "seasocks/StringUtil.h"
31#include "seasocks/WebSocket.h"
32
33extern "C" {
34GST_PLUGIN_STATIC_DECLARE(app);
35GST_PLUGIN_STATIC_DECLARE(coreelements);
36GST_PLUGIN_STATIC_DECLARE(dtls);
37GST_PLUGIN_STATIC_DECLARE(nice);
38GST_PLUGIN_STATIC_DECLARE(rtp);
39GST_PLUGIN_STATIC_DECLARE(rtpmanager);
40GST_PLUGIN_STATIC_DECLARE(srtp);
41GST_PLUGIN_STATIC_DECLARE(webrtc);
42GST_PLUGIN_STATIC_DECLARE(video4linux2);
43GST_PLUGIN_STATIC_DECLARE(videoconvert);
44GST_PLUGIN_STATIC_DECLARE(videoparsersbad);
45GST_PLUGIN_STATIC_DECLARE(videorate);
46GST_PLUGIN_STATIC_DECLARE(videoscale);
47GST_PLUGIN_STATIC_DECLARE(videotestsrc);
48GST_PLUGIN_STATIC_DECLARE(x264);
49}
Tyler Chatow39b6a322022-04-15 00:03:58 -070050
milind-ub0773e92023-02-05 15:57:43 -080051DEFINE_string(config, "aos_config.json",
Tyler Chatowb3850c12020-02-26 20:55:48 -080052 "Name of the config file to replay using.");
Tyler Chatow39b6a322022-04-15 00:03:58 -070053DEFINE_string(device, "/dev/video0",
54 "Camera fd. Ignored if reading from channel");
Tyler Chatowb3850c12020-02-26 20:55:48 -080055DEFINE_string(data_dir, "image_streamer_www",
56 "Directory to serve data files from");
57DEFINE_int32(width, 400, "Image width");
58DEFINE_int32(height, 300, "Image height");
59DEFINE_int32(framerate, 25, "Framerate (FPS)");
60DEFINE_int32(brightness, 50, "Camera brightness");
61DEFINE_int32(exposure, 300, "Manual exposure");
62DEFINE_int32(bitrate, 500000, "H264 encode bitrate");
milind-ub0773e92023-02-05 15:57:43 -080063DEFINE_int32(streaming_port, 1180, "Port to stream images on with seasocks");
Tyler Chatowb3850c12020-02-26 20:55:48 -080064DEFINE_int32(min_port, 5800, "Min rtp port");
65DEFINE_int32(max_port, 5810, "Max rtp port");
Tyler Chatow39b6a322022-04-15 00:03:58 -070066DEFINE_string(listen_on, "",
67 "Channel on which to receive frames from. Used in place of "
68 "internal V4L2 reader. Note: width and height MUST match the "
69 "expected size of channel images.");
Tyler Chatowb3850c12020-02-26 20:55:48 -080070
71class Connection;
72
73using aos::web_proxy::Payload;
74using aos::web_proxy::SdpType;
75using aos::web_proxy::WebSocketIce;
76using aos::web_proxy::WebSocketMessage;
77using aos::web_proxy::WebSocketSdp;
78
Tyler Chatow39b6a322022-04-15 00:03:58 -070079class GstSampleSource {
80 public:
81 GstSampleSource() = default;
82
83 virtual ~GstSampleSource() = default;
84
85 private:
86 GstSampleSource(const GstSampleSource &) = delete;
87};
88
89class V4L2Source : public GstSampleSource {
90 public:
91 V4L2Source(std::function<void(GstSample *)> callback)
92 : callback_(std::move(callback)) {
93 GError *error = NULL;
94
95 // Create pipeline to read from camera, pack into rtp stream, and dump
96 // stream to callback. v4l2 device should already be configured with correct
97 // bitrate from v4l2-ctl. do-timestamp marks the time the frame was taken to
98 // track when it should be dropped under latency.
99
100 // With the Pi's hardware encoder, we can encode and package the stream once
101 // and the clients will jump in at any point unsynchronized. With the stream
102 // from x264enc this doesn't seem to work. For now, just reencode for each
103 // client since we don't expect more than 1 or 2.
104
105 pipeline_ = gst_parse_launch(
106 absl::StrFormat("v4l2src device=%s do-timestamp=true "
107 "extra-controls=\"c,brightness=%d,auto_exposure=1,"
108 "exposure_time_absolute=%d\" ! "
109 "video/x-raw,width=%d,height=%d,framerate=%d/"
110 "1,format=YUY2 ! appsink "
111 "name=appsink "
112 "emit-signals=true sync=false async=false "
113 "caps=video/x-raw,format=YUY2",
114 FLAGS_device, FLAGS_brightness, FLAGS_exposure,
115 FLAGS_width, FLAGS_height, FLAGS_framerate)
116 .c_str(),
117 &error);
118
119 if (error != NULL) {
120 LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
121 }
122
123 appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
124 if (appsink_ == NULL) {
125 LOG(FATAL) << "Could not get appsink";
126 }
127
128 g_signal_connect(appsink_, "new-sample",
129 G_CALLBACK(V4L2Source::OnSampleCallback),
130 static_cast<gpointer>(this));
131
132 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
133 }
134
135 ~V4L2Source() {
136 if (pipeline_ != NULL) {
137 gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
138 gst_object_unref(GST_OBJECT(pipeline_));
139 gst_object_unref(GST_OBJECT(appsink_));
140 }
141 }
142
143 private:
144 static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
145 static_cast<V4L2Source *>(user_data)->OnSample();
146 return GST_FLOW_OK;
147 }
148
149 void OnSample() {
150 GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
151 if (sample == NULL) {
152 LOG(WARNING) << "Received null sample";
153 return;
154 }
155 callback_(sample);
156 gst_sample_unref(sample);
157 }
158
159 GstElement *pipeline_;
160 GstElement *appsink_;
161
162 std::function<void(GstSample *)> callback_;
163};
164
165class ChannelSource : public GstSampleSource {
166 public:
167 ChannelSource(aos::ShmEventLoop *event_loop,
168 std::function<void(GstSample *)> callback)
169 : callback_(std::move(callback)) {
170 event_loop->MakeWatcher(
171 FLAGS_listen_on,
172 [this](const frc971::vision::CameraImage &image) { OnImage(image); });
173 }
174
175 private:
176 void OnImage(const frc971::vision::CameraImage &image) {
177 if (!image.has_rows() || !image.has_cols() || !image.has_data()) {
178 VLOG(2) << "Skipping CameraImage with no data";
179 return;
180 }
181 CHECK_EQ(image.rows(), FLAGS_height);
182 CHECK_EQ(image.cols(), FLAGS_width);
183
184 GBytes *bytes = g_bytes_new(image.data()->data(), image.data()->size());
185 GstBuffer *buffer = gst_buffer_new_wrapped_bytes(bytes);
186
187 GST_BUFFER_PTS(buffer) = image.monotonic_timestamp_ns();
188
189 GstCaps *caps = CHECK_NOTNULL(gst_caps_new_simple(
190 "video/x-raw", "width", G_TYPE_INT, image.cols(), "height", G_TYPE_INT,
191 image.rows(), "format", G_TYPE_STRING, "YUY2", nullptr));
192
193 GstSample *sample = gst_sample_new(buffer, caps, nullptr, nullptr);
194
195 callback_(sample);
196
197 gst_sample_unref(sample);
198 gst_caps_unref(caps);
199 gst_buffer_unref(buffer);
200 g_bytes_unref(bytes);
201 }
202
203 std::function<void(GstSample *)> callback_;
204};
205
Tyler Chatowb3850c12020-02-26 20:55:48 -0800206// Basic class that handles receiving new websocket connections. Creates a new
207// Connection to manage the rest of the negotiation and data passing. When the
208// websocket closes, it deletes the Connection.
209class WebsocketHandler : public ::seasocks::WebSocket::Handler {
210 public:
211 WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server);
Tyler Chatow39b6a322022-04-15 00:03:58 -0700212 ~WebsocketHandler() override = default;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800213
214 void onConnect(::seasocks::WebSocket *sock) override;
215 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
216 size_t size) override;
217 void onDisconnect(::seasocks::WebSocket *sock) override;
218
219 private:
Tyler Chatow39b6a322022-04-15 00:03:58 -0700220 void OnSample(GstSample *sample);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800221
222 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
223 ::seasocks::Server *server_;
Tyler Chatow39b6a322022-04-15 00:03:58 -0700224 std::unique_ptr<GstSampleSource> source_;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800225
226 aos::Sender<frc971::vision::CameraImage> sender_;
227};
228
229// Seasocks requires that sends happen on the correct thread. This class takes a
230// detached buffer to send on a specific websocket connection and sends it when
231// seasocks is ready.
232class UpdateData : public ::seasocks::Server::Runnable {
233 public:
234 UpdateData(::seasocks::WebSocket *websocket,
235 flatbuffers::DetachedBuffer &&buffer)
236 : sock_(websocket), buffer_(std::move(buffer)) {}
237 ~UpdateData() override = default;
238 UpdateData(const UpdateData &) = delete;
239 UpdateData &operator=(const UpdateData &) = delete;
240
241 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
242
243 private:
244 ::seasocks::WebSocket *sock_;
245 const flatbuffers::DetachedBuffer buffer_;
246};
247
248class Connection {
249 public:
250 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
251
252 ~Connection();
253
254 void HandleWebSocketData(const uint8_t *data, size_t size);
255
256 void OnSample(GstSample *sample);
257
258 private:
259 static void OnOfferCreatedCallback(GstPromise *promise, gpointer user_data) {
260 static_cast<Connection *>(user_data)->OnOfferCreated(promise);
261 }
262
263 static void OnNegotiationNeededCallback(GstElement *, gpointer user_data) {
264 static_cast<Connection *>(user_data)->OnNegotiationNeeded();
265 }
266
267 static void OnIceCandidateCallback(GstElement *, guint mline_index,
268 gchar *candidate, gpointer user_data) {
269 static_cast<Connection *>(user_data)->OnIceCandidate(mline_index,
270 candidate);
271 }
272
273 void OnOfferCreated(GstPromise *promise);
274 void OnNegotiationNeeded();
275 void OnIceCandidate(guint mline_index, gchar *candidate);
276
277 ::seasocks::WebSocket *sock_;
278 ::seasocks::Server *server_;
279
280 GstElement *pipeline_;
281 GstElement *webrtcbin_;
282 GstElement *appsrc_;
283
284 bool first_sample_ = true;
285};
286
287WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop,
288 ::seasocks::Server *server)
Tyler Chatow39b6a322022-04-15 00:03:58 -0700289 : server_(server) {
290 if (FLAGS_listen_on.empty()) {
291 sender_ = event_loop->MakeSender<frc971::vision::CameraImage>("/camera");
292 source_ =
293 std::make_unique<V4L2Source>([this](auto sample) { OnSample(sample); });
294 } else {
295 source_ = std::make_unique<ChannelSource>(
296 event_loop, [this](auto sample) { OnSample(sample); });
Tyler Chatowb3850c12020-02-26 20:55:48 -0800297 }
298}
299
300void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
301 std::unique_ptr<Connection> conn =
302 std::make_unique<Connection>(sock, server_);
303 connections_.insert({sock, std::move(conn)});
304}
305
306void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
307 size_t size) {
308 connections_[sock]->HandleWebSocketData(data, size);
309}
310
Tyler Chatow39b6a322022-04-15 00:03:58 -0700311void WebsocketHandler::OnSample(GstSample *sample) {
Tyler Chatowb3850c12020-02-26 20:55:48 -0800312 for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) {
313 iter->second->OnSample(sample);
314 }
315
Tyler Chatow39b6a322022-04-15 00:03:58 -0700316 if (sender_.valid()) {
Tyler Chatowb3850c12020-02-26 20:55:48 -0800317 const GstCaps *caps = CHECK_NOTNULL(gst_sample_get_caps(sample));
318 CHECK_GT(gst_caps_get_size(caps), 0U);
319 const GstStructure *str = gst_caps_get_structure(caps, 0);
320
321 gint width;
322 gint height;
323
324 CHECK(gst_structure_get_int(str, "width", &width));
325 CHECK(gst_structure_get_int(str, "height", &height));
326
327 GstBuffer *buffer = CHECK_NOTNULL(gst_sample_get_buffer(sample));
328
329 const gsize size = gst_buffer_get_size(buffer);
330
331 auto builder = sender_.MakeBuilder();
332
333 uint8_t *image_data;
334 auto image_offset =
335 builder.fbb()->CreateUninitializedVector(size, &image_data);
336 gst_buffer_extract(buffer, 0, image_data, size);
337
338 auto image_builder = builder.MakeBuilder<frc971::vision::CameraImage>();
339 image_builder.add_rows(height);
340 image_builder.add_cols(width);
341 image_builder.add_data(image_offset);
342
343 builder.CheckOk(builder.Send(image_builder.Finish()));
344 }
Tyler Chatowb3850c12020-02-26 20:55:48 -0800345}
346
347void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
348 connections_.erase(sock);
349}
350
351Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
352 : sock_(sock), server_(server) {
353 GError *error = NULL;
354
355 // Build pipeline to read data from application into pipeline, place in
356 // webrtcbin group, and stream.
357
358 pipeline_ = gst_parse_launch(
359 // aggregate-mode should be zero-latency but this drops the stream on
360 // bitrate spikes for some reason - probably the weak CPU on the pi.
361 absl::StrFormat(
362 "webrtcbin name=webrtcbin appsrc "
363 "name=appsrc block=false "
364 "is-live=true "
365 "format=3 max-buffers=0 leaky-type=2 "
366 "caps=video/x-raw,width=%d,height=%d,format=YUY2 ! videoconvert ! "
367 "x264enc bitrate=%d speed-preset=ultrafast "
368 "tune=zerolatency key-int-max=15 sliced-threads=true ! "
369 "video/x-h264,profile=constrained-baseline ! h264parse ! "
370 "rtph264pay "
371 "config-interval=-1 name=payloader aggregate-mode=none ! "
372 "application/"
373 "x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 !"
374 "webrtcbin. ",
375 FLAGS_width, FLAGS_height, FLAGS_bitrate / 1000)
376 .c_str(),
377 &error);
378
379 if (error != NULL) {
380 LOG(FATAL) << "Could not create WebRTC pipeline: " << error->message;
381 }
382
383 webrtcbin_ = gst_bin_get_by_name(GST_BIN(pipeline_), "webrtcbin");
384 if (webrtcbin_ == NULL) {
385 LOG(FATAL) << "Could not initialize webrtcbin";
386 }
387
388 appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsrc");
389 if (appsrc_ == NULL) {
390 LOG(FATAL) << "Could not initialize appsrc";
391 }
392
393 {
394 GArray *transceivers;
395 g_signal_emit_by_name(webrtcbin_, "get-transceivers", &transceivers);
396 if (transceivers == NULL || transceivers->len <= 0) {
397 LOG(FATAL) << "Could not initialize transceivers";
398 }
399
400 GstWebRTCRTPTransceiver *trans =
401 g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0);
402 g_object_set(trans, "direction",
403 GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY, nullptr);
404
405 g_array_unref(transceivers);
406 }
407
408 {
409 GstObject *ice = nullptr;
410 g_object_get(G_OBJECT(webrtcbin_), "ice-agent", &ice, nullptr);
411 CHECK_NOTNULL(ice);
412
413 g_object_set(ice, "min-rtp-port", FLAGS_min_port, "max-rtp-port",
414 FLAGS_max_port, nullptr);
415
416 // We don't need upnp on a local network.
417 {
418 GstObject *nice = nullptr;
419 g_object_get(ice, "agent", &nice, nullptr);
420 CHECK_NOTNULL(nice);
421
422 g_object_set(nice, "upnp", false, nullptr);
423 g_object_unref(nice);
424 }
425
426 gst_object_unref(ice);
427 }
428
429 g_signal_connect(webrtcbin_, "on-negotiation-needed",
430 G_CALLBACK(Connection::OnNegotiationNeededCallback),
431 static_cast<gpointer>(this));
432
433 g_signal_connect(webrtcbin_, "on-ice-candidate",
434 G_CALLBACK(Connection::OnIceCandidateCallback),
435 static_cast<gpointer>(this));
436
437 gst_element_set_state(pipeline_, GST_STATE_READY);
438 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
439}
440
441Connection::~Connection() {
442 if (pipeline_ != NULL) {
443 gst_element_set_state(pipeline_, GST_STATE_NULL);
444
445 gst_object_unref(GST_OBJECT(webrtcbin_));
446 gst_object_unref(GST_OBJECT(pipeline_));
447 gst_object_unref(GST_OBJECT(appsrc_));
448 }
449}
450
451void Connection::OnSample(GstSample *sample) {
452 GstFlowReturn response =
453 gst_app_src_push_sample(GST_APP_SRC(appsrc_), sample);
454 if (response != GST_FLOW_OK) {
455 LOG(WARNING) << "Sample pushed, did not receive OK";
456 }
457
458 // Since the stream is already running (the camera turns on with
459 // image_streamer) we need to tell the new appsrc where
460 // we are starting in the stream so it can catch up immediately.
461 if (first_sample_) {
462 GstPad *src = gst_element_get_static_pad(appsrc_, "src");
463 if (src == NULL) {
464 return;
465 }
466
467 GstSegment *segment = gst_sample_get_segment(sample);
468 GstBuffer *buffer = gst_sample_get_buffer(sample);
469
470 guint64 offset = gst_segment_to_running_time(segment, GST_FORMAT_TIME,
471 GST_BUFFER_PTS(buffer));
472 LOG(INFO) << "Fixing offset " << offset;
473 gst_pad_set_offset(src, -offset);
474
475 gst_object_unref(GST_OBJECT(src));
476 first_sample_ = false;
477 }
478}
479
480void Connection::OnOfferCreated(GstPromise *promise) {
481 LOG(INFO) << "OnOfferCreated";
482
483 GstWebRTCSessionDescription *offer = NULL;
484 gst_structure_get(gst_promise_get_reply(promise), "offer",
485 GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
486 gst_promise_unref(promise);
487
488 {
489 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)>
490 local_desc_promise(gst_promise_new(), &gst_promise_unref);
491 g_signal_emit_by_name(webrtcbin_, "set-local-description", offer,
492 local_desc_promise.get());
493 gst_promise_interrupt(local_desc_promise.get());
494 }
495
496 GstSDPMessage *sdp_msg = offer->sdp;
497 std::string sdp_str(gst_sdp_message_as_text(sdp_msg));
498
499 LOG(INFO) << "Negotiation offer created:\n" << sdp_str;
500
501 flatbuffers::FlatBufferBuilder fbb(512);
502 flatbuffers::Offset<WebSocketSdp> sdp_fb =
503 CreateWebSocketSdpDirect(fbb, SdpType::OFFER, sdp_str.c_str());
504 flatbuffers::Offset<WebSocketMessage> answer_message =
505 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
506 fbb.Finish(answer_message);
507
508 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
509}
510
511void Connection::OnNegotiationNeeded() {
512 LOG(INFO) << "OnNegotiationNeeded";
513
514 GstPromise *promise;
515 promise = gst_promise_new_with_change_func(Connection::OnOfferCreatedCallback,
516 static_cast<gpointer>(this), NULL);
517 g_signal_emit_by_name(G_OBJECT(webrtcbin_), "create-offer", NULL, promise);
518}
519
520void Connection::OnIceCandidate(guint mline_index, gchar *candidate) {
521 LOG(INFO) << "OnIceCandidate";
522
523 flatbuffers::FlatBufferBuilder fbb(512);
524
525 auto ice_fb_builder = WebSocketIce::Builder(fbb);
526 ice_fb_builder.add_sdp_m_line_index(mline_index);
527 ice_fb_builder.add_sdp_mid(fbb.CreateString("video0"));
528 ice_fb_builder.add_candidate(
529 fbb.CreateString(static_cast<char *>(candidate)));
530 flatbuffers::Offset<WebSocketIce> ice_fb = ice_fb_builder.Finish();
531
532 flatbuffers::Offset<WebSocketMessage> ice_message =
533 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
534 fbb.Finish(ice_message);
535
536 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
537
538 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
539 candidate);
540}
541
542void Connection::HandleWebSocketData(const uint8_t *data, size_t /* size*/) {
543 LOG(INFO) << "HandleWebSocketData";
544
545 const WebSocketMessage *message =
546 flatbuffers::GetRoot<WebSocketMessage>(data);
547
548 switch (message->payload_type()) {
549 case Payload::WebSocketSdp: {
550 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
551 if (offer->type() != SdpType::ANSWER) {
552 LOG(WARNING) << "Expected SDP message type \"answer\"";
553 break;
554 }
555 const flatbuffers::String *sdp_string = offer->payload();
556
557 LOG(INFO) << "Received SDP:\n" << sdp_string->c_str();
558
559 GstSDPMessage *sdp;
560 GstSDPResult status = gst_sdp_message_new(&sdp);
561 if (status != GST_SDP_OK) {
562 LOG(WARNING) << "Could not create SDP message";
563 break;
564 }
565
566 status = gst_sdp_message_parse_buffer((const guint8 *)sdp_string->c_str(),
567 sdp_string->size(), sdp);
568
569 if (status != GST_SDP_OK) {
570 LOG(WARNING) << "Could not parse SDP string";
571 break;
572 }
573
574 std::unique_ptr<GstWebRTCSessionDescription,
575 decltype(&gst_webrtc_session_description_free)>
576 answer(gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER,
577 sdp),
578 &gst_webrtc_session_description_free);
579 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> promise(
580 gst_promise_new(), &gst_promise_unref);
581 g_signal_emit_by_name(webrtcbin_, "set-remote-description", answer.get(),
582 promise.get());
583 gst_promise_interrupt(promise.get());
584
585 break;
586 }
587 case Payload::WebSocketIce: {
588 const WebSocketIce *ice = message->payload_as_WebSocketIce();
589 if (!ice->has_candidate() || ice->candidate()->size() == 0) {
590 LOG(WARNING) << "Received ICE message without candidate";
591 break;
592 }
593
594 const gchar *candidate =
595 static_cast<const gchar *>(ice->candidate()->c_str());
596 guint mline_index = ice->sdp_m_line_index();
597
598 LOG(INFO) << "Received ICE candidate with mline index " << mline_index
599 << "; candidate: " << candidate;
600
601 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
602 candidate);
603
604 break;
605 }
606 default:
607 break;
608 }
609}
610
611void RegisterPlugins() {
612 GST_PLUGIN_STATIC_REGISTER(app);
613 GST_PLUGIN_STATIC_REGISTER(coreelements);
614 GST_PLUGIN_STATIC_REGISTER(dtls);
615 GST_PLUGIN_STATIC_REGISTER(nice);
616 GST_PLUGIN_STATIC_REGISTER(rtp);
617 GST_PLUGIN_STATIC_REGISTER(rtpmanager);
618 GST_PLUGIN_STATIC_REGISTER(srtp);
619 GST_PLUGIN_STATIC_REGISTER(webrtc);
620 GST_PLUGIN_STATIC_REGISTER(video4linux2);
621 GST_PLUGIN_STATIC_REGISTER(videoconvert);
622 GST_PLUGIN_STATIC_REGISTER(videoparsersbad);
623 GST_PLUGIN_STATIC_REGISTER(videorate);
624 GST_PLUGIN_STATIC_REGISTER(videoscale);
625 GST_PLUGIN_STATIC_REGISTER(videotestsrc);
626 GST_PLUGIN_STATIC_REGISTER(x264);
627}
628
629int main(int argc, char **argv) {
630 aos::InitGoogle(&argc, &argv);
631
632 findEmbeddedContent("");
633
634 std::string openssl_env = "OPENSSL_CONF=\"\"";
635 putenv(const_cast<char *>(openssl_env.c_str()));
636
637 putenv(const_cast<char *>("GST_REGISTRY_DISABLE=yes"));
638
639 gst_init(&argc, &argv);
640 RegisterPlugins();
641
642 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
643 aos::configuration::ReadConfig(FLAGS_config);
644 aos::ShmEventLoop event_loop(&config.message());
645
646 {
647 aos::GlibMainLoop main_loop(&event_loop);
648
649 seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
650 new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
651
652 LOG(INFO) << "Serving from " << FLAGS_data_dir;
653
654 auto websocket_handler =
655 std::make_shared<WebsocketHandler>(&event_loop, &server);
656 server.addWebSocketHandler("/ws", websocket_handler);
657
milind-ub0773e92023-02-05 15:57:43 -0800658 server.startListening(FLAGS_streaming_port);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800659 server.setStaticPath(FLAGS_data_dir.c_str());
660
661 aos::internal::EPoll *epoll = event_loop.epoll();
662
663 epoll->OnReadable(server.fd(), [&server] {
664 CHECK(::seasocks::Server::PollResult::Continue == server.poll(0));
665 });
666
667 event_loop.Run();
668
669 epoll->DeleteFd(server.fd());
670 server.terminate();
671 }
672
673 gst_deinit();
674
675 return 0;
676}