blob: ef6657ea060936a90970c1aba8b45415ce663962 [file] [log] [blame]
Tyler Chatowb3850c12020-02-26 20:55:48 -08001#define GST_USE_UNSTABLE_API
2#define GST_DISABLE_REGISTRY 1
3
4#include <glib-unix.h>
5#include <glib.h>
6#include <gst/app/app.h>
7#include <gst/gst.h>
8#include <gst/sdp/sdp.h>
9#include <gst/webrtc/icetransport.h>
10#include <gst/webrtc/webrtc.h>
11#include <sys/stat.h>
12#include <sys/types.h>
13
14#include <map>
15#include <thread>
16
17#include "absl/strings/str_format.h"
18#include "aos/events/glib_main_loop.h"
19#include "aos/events/shm_event_loop.h"
20#include "aos/init.h"
21#include "aos/network/web_proxy_generated.h"
22#include "aos/seasocks/seasocks_logger.h"
23#include "flatbuffers/flatbuffers.h"
24#include "frc971/vision/vision_generated.h"
25#include "gflags/gflags.h"
26#include "glog/logging.h"
27#include "internal/Embedded.h"
28#include "seasocks/Server.h"
29#include "seasocks/StringUtil.h"
30#include "seasocks/WebSocket.h"
31
32extern "C" {
33GST_PLUGIN_STATIC_DECLARE(app);
34GST_PLUGIN_STATIC_DECLARE(coreelements);
35GST_PLUGIN_STATIC_DECLARE(dtls);
36GST_PLUGIN_STATIC_DECLARE(nice);
37GST_PLUGIN_STATIC_DECLARE(rtp);
38GST_PLUGIN_STATIC_DECLARE(rtpmanager);
39GST_PLUGIN_STATIC_DECLARE(srtp);
40GST_PLUGIN_STATIC_DECLARE(webrtc);
41GST_PLUGIN_STATIC_DECLARE(video4linux2);
42GST_PLUGIN_STATIC_DECLARE(videoconvert);
43GST_PLUGIN_STATIC_DECLARE(videoparsersbad);
44GST_PLUGIN_STATIC_DECLARE(videorate);
45GST_PLUGIN_STATIC_DECLARE(videoscale);
46GST_PLUGIN_STATIC_DECLARE(videotestsrc);
47GST_PLUGIN_STATIC_DECLARE(x264);
48}
Tyler Chatow39b6a322022-04-15 00:03:58 -070049
Tyler Chatowb3850c12020-02-26 20:55:48 -080050DEFINE_string(config, "y2022/aos_config.json",
51 "Name of the config file to replay using.");
Tyler Chatow39b6a322022-04-15 00:03:58 -070052DEFINE_string(device, "/dev/video0",
53 "Camera fd. Ignored if reading from channel");
Tyler Chatowb3850c12020-02-26 20:55:48 -080054DEFINE_string(data_dir, "image_streamer_www",
55 "Directory to serve data files from");
56DEFINE_int32(width, 400, "Image width");
57DEFINE_int32(height, 300, "Image height");
58DEFINE_int32(framerate, 25, "Framerate (FPS)");
59DEFINE_int32(brightness, 50, "Camera brightness");
60DEFINE_int32(exposure, 300, "Manual exposure");
61DEFINE_int32(bitrate, 500000, "H264 encode bitrate");
62DEFINE_int32(min_port, 5800, "Min rtp port");
63DEFINE_int32(max_port, 5810, "Max rtp port");
Tyler Chatow39b6a322022-04-15 00:03:58 -070064DEFINE_string(listen_on, "",
65 "Channel on which to receive frames from. Used in place of "
66 "internal V4L2 reader. Note: width and height MUST match the "
67 "expected size of channel images.");
Tyler Chatowb3850c12020-02-26 20:55:48 -080068
69class Connection;
70
71using aos::web_proxy::Payload;
72using aos::web_proxy::SdpType;
73using aos::web_proxy::WebSocketIce;
74using aos::web_proxy::WebSocketMessage;
75using aos::web_proxy::WebSocketSdp;
76
Tyler Chatow39b6a322022-04-15 00:03:58 -070077class GstSampleSource {
78 public:
79 GstSampleSource() = default;
80
81 virtual ~GstSampleSource() = default;
82
83 private:
84 GstSampleSource(const GstSampleSource &) = delete;
85};
86
87class V4L2Source : public GstSampleSource {
88 public:
89 V4L2Source(std::function<void(GstSample *)> callback)
90 : callback_(std::move(callback)) {
91 GError *error = NULL;
92
93 // Create pipeline to read from camera, pack into rtp stream, and dump
94 // stream to callback. v4l2 device should already be configured with correct
95 // bitrate from v4l2-ctl. do-timestamp marks the time the frame was taken to
96 // track when it should be dropped under latency.
97
98 // With the Pi's hardware encoder, we can encode and package the stream once
99 // and the clients will jump in at any point unsynchronized. With the stream
100 // from x264enc this doesn't seem to work. For now, just reencode for each
101 // client since we don't expect more than 1 or 2.
102
103 pipeline_ = gst_parse_launch(
104 absl::StrFormat("v4l2src device=%s do-timestamp=true "
105 "extra-controls=\"c,brightness=%d,auto_exposure=1,"
106 "exposure_time_absolute=%d\" ! "
107 "video/x-raw,width=%d,height=%d,framerate=%d/"
108 "1,format=YUY2 ! appsink "
109 "name=appsink "
110 "emit-signals=true sync=false async=false "
111 "caps=video/x-raw,format=YUY2",
112 FLAGS_device, FLAGS_brightness, FLAGS_exposure,
113 FLAGS_width, FLAGS_height, FLAGS_framerate)
114 .c_str(),
115 &error);
116
117 if (error != NULL) {
118 LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
119 }
120
121 appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
122 if (appsink_ == NULL) {
123 LOG(FATAL) << "Could not get appsink";
124 }
125
126 g_signal_connect(appsink_, "new-sample",
127 G_CALLBACK(V4L2Source::OnSampleCallback),
128 static_cast<gpointer>(this));
129
130 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
131 }
132
133 ~V4L2Source() {
134 if (pipeline_ != NULL) {
135 gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
136 gst_object_unref(GST_OBJECT(pipeline_));
137 gst_object_unref(GST_OBJECT(appsink_));
138 }
139 }
140
141 private:
142 static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
143 static_cast<V4L2Source *>(user_data)->OnSample();
144 return GST_FLOW_OK;
145 }
146
147 void OnSample() {
148 GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
149 if (sample == NULL) {
150 LOG(WARNING) << "Received null sample";
151 return;
152 }
153 callback_(sample);
154 gst_sample_unref(sample);
155 }
156
157 GstElement *pipeline_;
158 GstElement *appsink_;
159
160 std::function<void(GstSample *)> callback_;
161};
162
163class ChannelSource : public GstSampleSource {
164 public:
165 ChannelSource(aos::ShmEventLoop *event_loop,
166 std::function<void(GstSample *)> callback)
167 : callback_(std::move(callback)) {
168 event_loop->MakeWatcher(
169 FLAGS_listen_on,
170 [this](const frc971::vision::CameraImage &image) { OnImage(image); });
171 }
172
173 private:
174 void OnImage(const frc971::vision::CameraImage &image) {
175 if (!image.has_rows() || !image.has_cols() || !image.has_data()) {
176 VLOG(2) << "Skipping CameraImage with no data";
177 return;
178 }
179 CHECK_EQ(image.rows(), FLAGS_height);
180 CHECK_EQ(image.cols(), FLAGS_width);
181
182 GBytes *bytes = g_bytes_new(image.data()->data(), image.data()->size());
183 GstBuffer *buffer = gst_buffer_new_wrapped_bytes(bytes);
184
185 GST_BUFFER_PTS(buffer) = image.monotonic_timestamp_ns();
186
187 GstCaps *caps = CHECK_NOTNULL(gst_caps_new_simple(
188 "video/x-raw", "width", G_TYPE_INT, image.cols(), "height", G_TYPE_INT,
189 image.rows(), "format", G_TYPE_STRING, "YUY2", nullptr));
190
191 GstSample *sample = gst_sample_new(buffer, caps, nullptr, nullptr);
192
193 callback_(sample);
194
195 gst_sample_unref(sample);
196 gst_caps_unref(caps);
197 gst_buffer_unref(buffer);
198 g_bytes_unref(bytes);
199 }
200
201 std::function<void(GstSample *)> callback_;
202};
203
Tyler Chatowb3850c12020-02-26 20:55:48 -0800204// Basic class that handles receiving new websocket connections. Creates a new
205// Connection to manage the rest of the negotiation and data passing. When the
206// websocket closes, it deletes the Connection.
207class WebsocketHandler : public ::seasocks::WebSocket::Handler {
208 public:
209 WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server);
Tyler Chatow39b6a322022-04-15 00:03:58 -0700210 ~WebsocketHandler() override = default;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800211
212 void onConnect(::seasocks::WebSocket *sock) override;
213 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
214 size_t size) override;
215 void onDisconnect(::seasocks::WebSocket *sock) override;
216
217 private:
Tyler Chatow39b6a322022-04-15 00:03:58 -0700218 void OnSample(GstSample *sample);
Tyler Chatowb3850c12020-02-26 20:55:48 -0800219
220 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
221 ::seasocks::Server *server_;
Tyler Chatow39b6a322022-04-15 00:03:58 -0700222 std::unique_ptr<GstSampleSource> source_;
Tyler Chatowb3850c12020-02-26 20:55:48 -0800223
224 aos::Sender<frc971::vision::CameraImage> sender_;
225};
226
227// Seasocks requires that sends happen on the correct thread. This class takes a
228// detached buffer to send on a specific websocket connection and sends it when
229// seasocks is ready.
230class UpdateData : public ::seasocks::Server::Runnable {
231 public:
232 UpdateData(::seasocks::WebSocket *websocket,
233 flatbuffers::DetachedBuffer &&buffer)
234 : sock_(websocket), buffer_(std::move(buffer)) {}
235 ~UpdateData() override = default;
236 UpdateData(const UpdateData &) = delete;
237 UpdateData &operator=(const UpdateData &) = delete;
238
239 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
240
241 private:
242 ::seasocks::WebSocket *sock_;
243 const flatbuffers::DetachedBuffer buffer_;
244};
245
246class Connection {
247 public:
248 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
249
250 ~Connection();
251
252 void HandleWebSocketData(const uint8_t *data, size_t size);
253
254 void OnSample(GstSample *sample);
255
256 private:
257 static void OnOfferCreatedCallback(GstPromise *promise, gpointer user_data) {
258 static_cast<Connection *>(user_data)->OnOfferCreated(promise);
259 }
260
261 static void OnNegotiationNeededCallback(GstElement *, gpointer user_data) {
262 static_cast<Connection *>(user_data)->OnNegotiationNeeded();
263 }
264
265 static void OnIceCandidateCallback(GstElement *, guint mline_index,
266 gchar *candidate, gpointer user_data) {
267 static_cast<Connection *>(user_data)->OnIceCandidate(mline_index,
268 candidate);
269 }
270
271 void OnOfferCreated(GstPromise *promise);
272 void OnNegotiationNeeded();
273 void OnIceCandidate(guint mline_index, gchar *candidate);
274
275 ::seasocks::WebSocket *sock_;
276 ::seasocks::Server *server_;
277
278 GstElement *pipeline_;
279 GstElement *webrtcbin_;
280 GstElement *appsrc_;
281
282 bool first_sample_ = true;
283};
284
285WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop,
286 ::seasocks::Server *server)
Tyler Chatow39b6a322022-04-15 00:03:58 -0700287 : server_(server) {
288 if (FLAGS_listen_on.empty()) {
289 sender_ = event_loop->MakeSender<frc971::vision::CameraImage>("/camera");
290 source_ =
291 std::make_unique<V4L2Source>([this](auto sample) { OnSample(sample); });
292 } else {
293 source_ = std::make_unique<ChannelSource>(
294 event_loop, [this](auto sample) { OnSample(sample); });
Tyler Chatowb3850c12020-02-26 20:55:48 -0800295 }
296}
297
298void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
299 std::unique_ptr<Connection> conn =
300 std::make_unique<Connection>(sock, server_);
301 connections_.insert({sock, std::move(conn)});
302}
303
304void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
305 size_t size) {
306 connections_[sock]->HandleWebSocketData(data, size);
307}
308
Tyler Chatow39b6a322022-04-15 00:03:58 -0700309void WebsocketHandler::OnSample(GstSample *sample) {
Tyler Chatowb3850c12020-02-26 20:55:48 -0800310 for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) {
311 iter->second->OnSample(sample);
312 }
313
Tyler Chatow39b6a322022-04-15 00:03:58 -0700314 if (sender_.valid()) {
Tyler Chatowb3850c12020-02-26 20:55:48 -0800315 const GstCaps *caps = CHECK_NOTNULL(gst_sample_get_caps(sample));
316 CHECK_GT(gst_caps_get_size(caps), 0U);
317 const GstStructure *str = gst_caps_get_structure(caps, 0);
318
319 gint width;
320 gint height;
321
322 CHECK(gst_structure_get_int(str, "width", &width));
323 CHECK(gst_structure_get_int(str, "height", &height));
324
325 GstBuffer *buffer = CHECK_NOTNULL(gst_sample_get_buffer(sample));
326
327 const gsize size = gst_buffer_get_size(buffer);
328
329 auto builder = sender_.MakeBuilder();
330
331 uint8_t *image_data;
332 auto image_offset =
333 builder.fbb()->CreateUninitializedVector(size, &image_data);
334 gst_buffer_extract(buffer, 0, image_data, size);
335
336 auto image_builder = builder.MakeBuilder<frc971::vision::CameraImage>();
337 image_builder.add_rows(height);
338 image_builder.add_cols(width);
339 image_builder.add_data(image_offset);
340
341 builder.CheckOk(builder.Send(image_builder.Finish()));
342 }
Tyler Chatowb3850c12020-02-26 20:55:48 -0800343}
344
345void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
346 connections_.erase(sock);
347}
348
349Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
350 : sock_(sock), server_(server) {
351 GError *error = NULL;
352
353 // Build pipeline to read data from application into pipeline, place in
354 // webrtcbin group, and stream.
355
356 pipeline_ = gst_parse_launch(
357 // aggregate-mode should be zero-latency but this drops the stream on
358 // bitrate spikes for some reason - probably the weak CPU on the pi.
359 absl::StrFormat(
360 "webrtcbin name=webrtcbin appsrc "
361 "name=appsrc block=false "
362 "is-live=true "
363 "format=3 max-buffers=0 leaky-type=2 "
364 "caps=video/x-raw,width=%d,height=%d,format=YUY2 ! videoconvert ! "
365 "x264enc bitrate=%d speed-preset=ultrafast "
366 "tune=zerolatency key-int-max=15 sliced-threads=true ! "
367 "video/x-h264,profile=constrained-baseline ! h264parse ! "
368 "rtph264pay "
369 "config-interval=-1 name=payloader aggregate-mode=none ! "
370 "application/"
371 "x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 !"
372 "webrtcbin. ",
373 FLAGS_width, FLAGS_height, FLAGS_bitrate / 1000)
374 .c_str(),
375 &error);
376
377 if (error != NULL) {
378 LOG(FATAL) << "Could not create WebRTC pipeline: " << error->message;
379 }
380
381 webrtcbin_ = gst_bin_get_by_name(GST_BIN(pipeline_), "webrtcbin");
382 if (webrtcbin_ == NULL) {
383 LOG(FATAL) << "Could not initialize webrtcbin";
384 }
385
386 appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsrc");
387 if (appsrc_ == NULL) {
388 LOG(FATAL) << "Could not initialize appsrc";
389 }
390
391 {
392 GArray *transceivers;
393 g_signal_emit_by_name(webrtcbin_, "get-transceivers", &transceivers);
394 if (transceivers == NULL || transceivers->len <= 0) {
395 LOG(FATAL) << "Could not initialize transceivers";
396 }
397
398 GstWebRTCRTPTransceiver *trans =
399 g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0);
400 g_object_set(trans, "direction",
401 GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY, nullptr);
402
403 g_array_unref(transceivers);
404 }
405
406 {
407 GstObject *ice = nullptr;
408 g_object_get(G_OBJECT(webrtcbin_), "ice-agent", &ice, nullptr);
409 CHECK_NOTNULL(ice);
410
411 g_object_set(ice, "min-rtp-port", FLAGS_min_port, "max-rtp-port",
412 FLAGS_max_port, nullptr);
413
414 // We don't need upnp on a local network.
415 {
416 GstObject *nice = nullptr;
417 g_object_get(ice, "agent", &nice, nullptr);
418 CHECK_NOTNULL(nice);
419
420 g_object_set(nice, "upnp", false, nullptr);
421 g_object_unref(nice);
422 }
423
424 gst_object_unref(ice);
425 }
426
427 g_signal_connect(webrtcbin_, "on-negotiation-needed",
428 G_CALLBACK(Connection::OnNegotiationNeededCallback),
429 static_cast<gpointer>(this));
430
431 g_signal_connect(webrtcbin_, "on-ice-candidate",
432 G_CALLBACK(Connection::OnIceCandidateCallback),
433 static_cast<gpointer>(this));
434
435 gst_element_set_state(pipeline_, GST_STATE_READY);
436 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
437}
438
439Connection::~Connection() {
440 if (pipeline_ != NULL) {
441 gst_element_set_state(pipeline_, GST_STATE_NULL);
442
443 gst_object_unref(GST_OBJECT(webrtcbin_));
444 gst_object_unref(GST_OBJECT(pipeline_));
445 gst_object_unref(GST_OBJECT(appsrc_));
446 }
447}
448
449void Connection::OnSample(GstSample *sample) {
450 GstFlowReturn response =
451 gst_app_src_push_sample(GST_APP_SRC(appsrc_), sample);
452 if (response != GST_FLOW_OK) {
453 LOG(WARNING) << "Sample pushed, did not receive OK";
454 }
455
456 // Since the stream is already running (the camera turns on with
457 // image_streamer) we need to tell the new appsrc where
458 // we are starting in the stream so it can catch up immediately.
459 if (first_sample_) {
460 GstPad *src = gst_element_get_static_pad(appsrc_, "src");
461 if (src == NULL) {
462 return;
463 }
464
465 GstSegment *segment = gst_sample_get_segment(sample);
466 GstBuffer *buffer = gst_sample_get_buffer(sample);
467
468 guint64 offset = gst_segment_to_running_time(segment, GST_FORMAT_TIME,
469 GST_BUFFER_PTS(buffer));
470 LOG(INFO) << "Fixing offset " << offset;
471 gst_pad_set_offset(src, -offset);
472
473 gst_object_unref(GST_OBJECT(src));
474 first_sample_ = false;
475 }
476}
477
478void Connection::OnOfferCreated(GstPromise *promise) {
479 LOG(INFO) << "OnOfferCreated";
480
481 GstWebRTCSessionDescription *offer = NULL;
482 gst_structure_get(gst_promise_get_reply(promise), "offer",
483 GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
484 gst_promise_unref(promise);
485
486 {
487 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)>
488 local_desc_promise(gst_promise_new(), &gst_promise_unref);
489 g_signal_emit_by_name(webrtcbin_, "set-local-description", offer,
490 local_desc_promise.get());
491 gst_promise_interrupt(local_desc_promise.get());
492 }
493
494 GstSDPMessage *sdp_msg = offer->sdp;
495 std::string sdp_str(gst_sdp_message_as_text(sdp_msg));
496
497 LOG(INFO) << "Negotiation offer created:\n" << sdp_str;
498
499 flatbuffers::FlatBufferBuilder fbb(512);
500 flatbuffers::Offset<WebSocketSdp> sdp_fb =
501 CreateWebSocketSdpDirect(fbb, SdpType::OFFER, sdp_str.c_str());
502 flatbuffers::Offset<WebSocketMessage> answer_message =
503 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
504 fbb.Finish(answer_message);
505
506 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
507}
508
509void Connection::OnNegotiationNeeded() {
510 LOG(INFO) << "OnNegotiationNeeded";
511
512 GstPromise *promise;
513 promise = gst_promise_new_with_change_func(Connection::OnOfferCreatedCallback,
514 static_cast<gpointer>(this), NULL);
515 g_signal_emit_by_name(G_OBJECT(webrtcbin_), "create-offer", NULL, promise);
516}
517
518void Connection::OnIceCandidate(guint mline_index, gchar *candidate) {
519 LOG(INFO) << "OnIceCandidate";
520
521 flatbuffers::FlatBufferBuilder fbb(512);
522
523 auto ice_fb_builder = WebSocketIce::Builder(fbb);
524 ice_fb_builder.add_sdp_m_line_index(mline_index);
525 ice_fb_builder.add_sdp_mid(fbb.CreateString("video0"));
526 ice_fb_builder.add_candidate(
527 fbb.CreateString(static_cast<char *>(candidate)));
528 flatbuffers::Offset<WebSocketIce> ice_fb = ice_fb_builder.Finish();
529
530 flatbuffers::Offset<WebSocketMessage> ice_message =
531 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
532 fbb.Finish(ice_message);
533
534 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
535
536 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
537 candidate);
538}
539
540void Connection::HandleWebSocketData(const uint8_t *data, size_t /* size*/) {
541 LOG(INFO) << "HandleWebSocketData";
542
543 const WebSocketMessage *message =
544 flatbuffers::GetRoot<WebSocketMessage>(data);
545
546 switch (message->payload_type()) {
547 case Payload::WebSocketSdp: {
548 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
549 if (offer->type() != SdpType::ANSWER) {
550 LOG(WARNING) << "Expected SDP message type \"answer\"";
551 break;
552 }
553 const flatbuffers::String *sdp_string = offer->payload();
554
555 LOG(INFO) << "Received SDP:\n" << sdp_string->c_str();
556
557 GstSDPMessage *sdp;
558 GstSDPResult status = gst_sdp_message_new(&sdp);
559 if (status != GST_SDP_OK) {
560 LOG(WARNING) << "Could not create SDP message";
561 break;
562 }
563
564 status = gst_sdp_message_parse_buffer((const guint8 *)sdp_string->c_str(),
565 sdp_string->size(), sdp);
566
567 if (status != GST_SDP_OK) {
568 LOG(WARNING) << "Could not parse SDP string";
569 break;
570 }
571
572 std::unique_ptr<GstWebRTCSessionDescription,
573 decltype(&gst_webrtc_session_description_free)>
574 answer(gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER,
575 sdp),
576 &gst_webrtc_session_description_free);
577 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> promise(
578 gst_promise_new(), &gst_promise_unref);
579 g_signal_emit_by_name(webrtcbin_, "set-remote-description", answer.get(),
580 promise.get());
581 gst_promise_interrupt(promise.get());
582
583 break;
584 }
585 case Payload::WebSocketIce: {
586 const WebSocketIce *ice = message->payload_as_WebSocketIce();
587 if (!ice->has_candidate() || ice->candidate()->size() == 0) {
588 LOG(WARNING) << "Received ICE message without candidate";
589 break;
590 }
591
592 const gchar *candidate =
593 static_cast<const gchar *>(ice->candidate()->c_str());
594 guint mline_index = ice->sdp_m_line_index();
595
596 LOG(INFO) << "Received ICE candidate with mline index " << mline_index
597 << "; candidate: " << candidate;
598
599 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
600 candidate);
601
602 break;
603 }
604 default:
605 break;
606 }
607}
608
609void RegisterPlugins() {
610 GST_PLUGIN_STATIC_REGISTER(app);
611 GST_PLUGIN_STATIC_REGISTER(coreelements);
612 GST_PLUGIN_STATIC_REGISTER(dtls);
613 GST_PLUGIN_STATIC_REGISTER(nice);
614 GST_PLUGIN_STATIC_REGISTER(rtp);
615 GST_PLUGIN_STATIC_REGISTER(rtpmanager);
616 GST_PLUGIN_STATIC_REGISTER(srtp);
617 GST_PLUGIN_STATIC_REGISTER(webrtc);
618 GST_PLUGIN_STATIC_REGISTER(video4linux2);
619 GST_PLUGIN_STATIC_REGISTER(videoconvert);
620 GST_PLUGIN_STATIC_REGISTER(videoparsersbad);
621 GST_PLUGIN_STATIC_REGISTER(videorate);
622 GST_PLUGIN_STATIC_REGISTER(videoscale);
623 GST_PLUGIN_STATIC_REGISTER(videotestsrc);
624 GST_PLUGIN_STATIC_REGISTER(x264);
625}
626
627int main(int argc, char **argv) {
628 aos::InitGoogle(&argc, &argv);
629
630 findEmbeddedContent("");
631
632 std::string openssl_env = "OPENSSL_CONF=\"\"";
633 putenv(const_cast<char *>(openssl_env.c_str()));
634
635 putenv(const_cast<char *>("GST_REGISTRY_DISABLE=yes"));
636
637 gst_init(&argc, &argv);
638 RegisterPlugins();
639
640 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
641 aos::configuration::ReadConfig(FLAGS_config);
642 aos::ShmEventLoop event_loop(&config.message());
643
644 {
645 aos::GlibMainLoop main_loop(&event_loop);
646
647 seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
648 new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
649
650 LOG(INFO) << "Serving from " << FLAGS_data_dir;
651
652 auto websocket_handler =
653 std::make_shared<WebsocketHandler>(&event_loop, &server);
654 server.addWebSocketHandler("/ws", websocket_handler);
655
656 server.startListening(1180);
657 server.setStaticPath(FLAGS_data_dir.c_str());
658
659 aos::internal::EPoll *epoll = event_loop.epoll();
660
661 epoll->OnReadable(server.fd(), [&server] {
662 CHECK(::seasocks::Server::PollResult::Continue == server.poll(0));
663 });
664
665 event_loop.Run();
666
667 epoll->DeleteFd(server.fd());
668 server.terminate();
669 }
670
671 gst_deinit();
672
673 return 0;
674}