blob: 9a990ed5baf45c0f8f5c893c50028dbfbc95543c [file] [log] [blame]
Tyler Chatowb3850c12020-02-26 20:55:48 -08001#define GST_USE_UNSTABLE_API
2#define GST_DISABLE_REGISTRY 1
3
4#include <glib-unix.h>
5#include <glib.h>
6#include <gst/app/app.h>
7#include <gst/gst.h>
8#include <gst/sdp/sdp.h>
9#include <gst/webrtc/icetransport.h>
10#include <gst/webrtc/webrtc.h>
11#include <sys/stat.h>
12#include <sys/types.h>
13
14#include <map>
15#include <thread>
16
17#include "absl/strings/str_format.h"
18#include "aos/events/glib_main_loop.h"
19#include "aos/events/shm_event_loop.h"
20#include "aos/init.h"
21#include "aos/network/web_proxy_generated.h"
22#include "aos/seasocks/seasocks_logger.h"
23#include "flatbuffers/flatbuffers.h"
24#include "frc971/vision/vision_generated.h"
25#include "gflags/gflags.h"
26#include "glog/logging.h"
27#include "internal/Embedded.h"
28#include "seasocks/Server.h"
29#include "seasocks/StringUtil.h"
30#include "seasocks/WebSocket.h"
31
32extern "C" {
33GST_PLUGIN_STATIC_DECLARE(app);
34GST_PLUGIN_STATIC_DECLARE(coreelements);
35GST_PLUGIN_STATIC_DECLARE(dtls);
36GST_PLUGIN_STATIC_DECLARE(nice);
37GST_PLUGIN_STATIC_DECLARE(rtp);
38GST_PLUGIN_STATIC_DECLARE(rtpmanager);
39GST_PLUGIN_STATIC_DECLARE(srtp);
40GST_PLUGIN_STATIC_DECLARE(webrtc);
41GST_PLUGIN_STATIC_DECLARE(video4linux2);
42GST_PLUGIN_STATIC_DECLARE(videoconvert);
43GST_PLUGIN_STATIC_DECLARE(videoparsersbad);
44GST_PLUGIN_STATIC_DECLARE(videorate);
45GST_PLUGIN_STATIC_DECLARE(videoscale);
46GST_PLUGIN_STATIC_DECLARE(videotestsrc);
47GST_PLUGIN_STATIC_DECLARE(x264);
48}
49DEFINE_string(config, "y2022/aos_config.json",
50 "Name of the config file to replay using.");
51DEFINE_string(device, "/dev/video0", "Camera fd");
52DEFINE_string(data_dir, "image_streamer_www",
53 "Directory to serve data files from");
54DEFINE_int32(width, 400, "Image width");
55DEFINE_int32(height, 300, "Image height");
56DEFINE_int32(framerate, 25, "Framerate (FPS)");
57DEFINE_int32(brightness, 50, "Camera brightness");
58DEFINE_int32(exposure, 300, "Manual exposure");
59DEFINE_int32(bitrate, 500000, "H264 encode bitrate");
60DEFINE_int32(min_port, 5800, "Min rtp port");
61DEFINE_int32(max_port, 5810, "Max rtp port");
62
63class Connection;
64
65using aos::web_proxy::Payload;
66using aos::web_proxy::SdpType;
67using aos::web_proxy::WebSocketIce;
68using aos::web_proxy::WebSocketMessage;
69using aos::web_proxy::WebSocketSdp;
70
71// Basic class that handles receiving new websocket connections. Creates a new
72// Connection to manage the rest of the negotiation and data passing. When the
73// websocket closes, it deletes the Connection.
74class WebsocketHandler : public ::seasocks::WebSocket::Handler {
75 public:
76 WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server);
77 ~WebsocketHandler() override;
78
79 void onConnect(::seasocks::WebSocket *sock) override;
80 void onData(::seasocks::WebSocket *sock, const uint8_t *data,
81 size_t size) override;
82 void onDisconnect(::seasocks::WebSocket *sock) override;
83
84 private:
85 static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) {
86 static_cast<WebsocketHandler *>(user_data)->OnSample();
87 return GST_FLOW_OK;
88 }
89
90 void OnSample();
91
92 std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_;
93 ::seasocks::Server *server_;
94 GstElement *pipeline_;
95 GstElement *appsink_;
96
97 aos::Sender<frc971::vision::CameraImage> sender_;
98};
99
100// Seasocks requires that sends happen on the correct thread. This class takes a
101// detached buffer to send on a specific websocket connection and sends it when
102// seasocks is ready.
103class UpdateData : public ::seasocks::Server::Runnable {
104 public:
105 UpdateData(::seasocks::WebSocket *websocket,
106 flatbuffers::DetachedBuffer &&buffer)
107 : sock_(websocket), buffer_(std::move(buffer)) {}
108 ~UpdateData() override = default;
109 UpdateData(const UpdateData &) = delete;
110 UpdateData &operator=(const UpdateData &) = delete;
111
112 void run() override { sock_->send(buffer_.data(), buffer_.size()); }
113
114 private:
115 ::seasocks::WebSocket *sock_;
116 const flatbuffers::DetachedBuffer buffer_;
117};
118
119class Connection {
120 public:
121 Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server);
122
123 ~Connection();
124
125 void HandleWebSocketData(const uint8_t *data, size_t size);
126
127 void OnSample(GstSample *sample);
128
129 private:
130 static void OnOfferCreatedCallback(GstPromise *promise, gpointer user_data) {
131 static_cast<Connection *>(user_data)->OnOfferCreated(promise);
132 }
133
134 static void OnNegotiationNeededCallback(GstElement *, gpointer user_data) {
135 static_cast<Connection *>(user_data)->OnNegotiationNeeded();
136 }
137
138 static void OnIceCandidateCallback(GstElement *, guint mline_index,
139 gchar *candidate, gpointer user_data) {
140 static_cast<Connection *>(user_data)->OnIceCandidate(mline_index,
141 candidate);
142 }
143
144 void OnOfferCreated(GstPromise *promise);
145 void OnNegotiationNeeded();
146 void OnIceCandidate(guint mline_index, gchar *candidate);
147
148 ::seasocks::WebSocket *sock_;
149 ::seasocks::Server *server_;
150
151 GstElement *pipeline_;
152 GstElement *webrtcbin_;
153 GstElement *appsrc_;
154
155 bool first_sample_ = true;
156};
157
158WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop,
159 ::seasocks::Server *server)
160 : server_(server),
161 sender_(event_loop->MakeSender<frc971::vision::CameraImage>("/camera")) {
162 GError *error = NULL;
163
164 // Create pipeline to read from camera, pack into rtp stream, and dump stream
165 // to callback.
166 // v4l2 device should already be configured with correct bitrate from
167 // v4l2-ctl. do-timestamp marks the time the frame was taken to track when it
168 // should be dropped under latency.
169
170 // With the Pi's hardware encoder, we can encode and package the stream once
171 // and the clients will jump in at any point unsynchronized. With the stream
172 // from x264enc this doesn't seem to work. For now, just reencode for each
173 // client since we don't expect more than 1 or 2.
174
175 pipeline_ = gst_parse_launch(
176 absl::StrFormat("v4l2src device=%s do-timestamp=true "
177 "extra-controls=\"c,brightness=%d,auto_exposure=1,"
178 "exposure_time_absolute=%d\" ! "
179 "video/x-raw,width=%d,height=%d,framerate=%d/"
180 "1,format=YUY2 ! appsink "
181 "name=appsink "
182 "emit-signals=true sync=false async=false "
183 "caps=video/x-raw,format=YUY2",
184 FLAGS_device, FLAGS_brightness, FLAGS_exposure,
185 FLAGS_width, FLAGS_height, FLAGS_framerate)
186 .c_str(),
187 &error);
188
189 if (error != NULL) {
190 LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message;
191 }
192
193 appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink");
194 if (appsink_ == NULL) {
195 LOG(FATAL) << "Could not get appsink";
196 }
197
198 g_signal_connect(appsink_, "new-sample",
199 G_CALLBACK(WebsocketHandler::OnSampleCallback),
200 static_cast<gpointer>(this));
201
202 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
203}
204
205WebsocketHandler::~WebsocketHandler() {
206 if (pipeline_ != NULL) {
207 gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL);
208 gst_object_unref(GST_OBJECT(pipeline_));
209 gst_object_unref(GST_OBJECT(appsink_));
210 }
211}
212
213void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) {
214 std::unique_ptr<Connection> conn =
215 std::make_unique<Connection>(sock, server_);
216 connections_.insert({sock, std::move(conn)});
217}
218
219void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data,
220 size_t size) {
221 connections_[sock]->HandleWebSocketData(data, size);
222}
223
224void WebsocketHandler::OnSample() {
225 GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_));
226 if (sample == NULL) {
227 LOG(WARNING) << "Received null sample";
228 return;
229 }
230
231 for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) {
232 iter->second->OnSample(sample);
233 }
234
235 {
236 const GstCaps *caps = CHECK_NOTNULL(gst_sample_get_caps(sample));
237 CHECK_GT(gst_caps_get_size(caps), 0U);
238 const GstStructure *str = gst_caps_get_structure(caps, 0);
239
240 gint width;
241 gint height;
242
243 CHECK(gst_structure_get_int(str, "width", &width));
244 CHECK(gst_structure_get_int(str, "height", &height));
245
246 GstBuffer *buffer = CHECK_NOTNULL(gst_sample_get_buffer(sample));
247
248 const gsize size = gst_buffer_get_size(buffer);
249
250 auto builder = sender_.MakeBuilder();
251
252 uint8_t *image_data;
253 auto image_offset =
254 builder.fbb()->CreateUninitializedVector(size, &image_data);
255 gst_buffer_extract(buffer, 0, image_data, size);
256
257 auto image_builder = builder.MakeBuilder<frc971::vision::CameraImage>();
258 image_builder.add_rows(height);
259 image_builder.add_cols(width);
260 image_builder.add_data(image_offset);
261
262 builder.CheckOk(builder.Send(image_builder.Finish()));
263 }
264
265 gst_sample_unref(sample);
266}
267
268void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) {
269 connections_.erase(sock);
270}
271
272Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server)
273 : sock_(sock), server_(server) {
274 GError *error = NULL;
275
276 // Build pipeline to read data from application into pipeline, place in
277 // webrtcbin group, and stream.
278
279 pipeline_ = gst_parse_launch(
280 // aggregate-mode should be zero-latency but this drops the stream on
281 // bitrate spikes for some reason - probably the weak CPU on the pi.
282 absl::StrFormat(
283 "webrtcbin name=webrtcbin appsrc "
284 "name=appsrc block=false "
285 "is-live=true "
286 "format=3 max-buffers=0 leaky-type=2 "
287 "caps=video/x-raw,width=%d,height=%d,format=YUY2 ! videoconvert ! "
288 "x264enc bitrate=%d speed-preset=ultrafast "
289 "tune=zerolatency key-int-max=15 sliced-threads=true ! "
290 "video/x-h264,profile=constrained-baseline ! h264parse ! "
291 "rtph264pay "
292 "config-interval=-1 name=payloader aggregate-mode=none ! "
293 "application/"
294 "x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 !"
295 "webrtcbin. ",
296 FLAGS_width, FLAGS_height, FLAGS_bitrate / 1000)
297 .c_str(),
298 &error);
299
300 if (error != NULL) {
301 LOG(FATAL) << "Could not create WebRTC pipeline: " << error->message;
302 }
303
304 webrtcbin_ = gst_bin_get_by_name(GST_BIN(pipeline_), "webrtcbin");
305 if (webrtcbin_ == NULL) {
306 LOG(FATAL) << "Could not initialize webrtcbin";
307 }
308
309 appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsrc");
310 if (appsrc_ == NULL) {
311 LOG(FATAL) << "Could not initialize appsrc";
312 }
313
314 {
315 GArray *transceivers;
316 g_signal_emit_by_name(webrtcbin_, "get-transceivers", &transceivers);
317 if (transceivers == NULL || transceivers->len <= 0) {
318 LOG(FATAL) << "Could not initialize transceivers";
319 }
320
321 GstWebRTCRTPTransceiver *trans =
322 g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0);
323 g_object_set(trans, "direction",
324 GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY, nullptr);
325
326 g_array_unref(transceivers);
327 }
328
329 {
330 GstObject *ice = nullptr;
331 g_object_get(G_OBJECT(webrtcbin_), "ice-agent", &ice, nullptr);
332 CHECK_NOTNULL(ice);
333
334 g_object_set(ice, "min-rtp-port", FLAGS_min_port, "max-rtp-port",
335 FLAGS_max_port, nullptr);
336
337 // We don't need upnp on a local network.
338 {
339 GstObject *nice = nullptr;
340 g_object_get(ice, "agent", &nice, nullptr);
341 CHECK_NOTNULL(nice);
342
343 g_object_set(nice, "upnp", false, nullptr);
344 g_object_unref(nice);
345 }
346
347 gst_object_unref(ice);
348 }
349
350 g_signal_connect(webrtcbin_, "on-negotiation-needed",
351 G_CALLBACK(Connection::OnNegotiationNeededCallback),
352 static_cast<gpointer>(this));
353
354 g_signal_connect(webrtcbin_, "on-ice-candidate",
355 G_CALLBACK(Connection::OnIceCandidateCallback),
356 static_cast<gpointer>(this));
357
358 gst_element_set_state(pipeline_, GST_STATE_READY);
359 gst_element_set_state(pipeline_, GST_STATE_PLAYING);
360}
361
362Connection::~Connection() {
363 if (pipeline_ != NULL) {
364 gst_element_set_state(pipeline_, GST_STATE_NULL);
365
366 gst_object_unref(GST_OBJECT(webrtcbin_));
367 gst_object_unref(GST_OBJECT(pipeline_));
368 gst_object_unref(GST_OBJECT(appsrc_));
369 }
370}
371
372void Connection::OnSample(GstSample *sample) {
373 GstFlowReturn response =
374 gst_app_src_push_sample(GST_APP_SRC(appsrc_), sample);
375 if (response != GST_FLOW_OK) {
376 LOG(WARNING) << "Sample pushed, did not receive OK";
377 }
378
379 // Since the stream is already running (the camera turns on with
380 // image_streamer) we need to tell the new appsrc where
381 // we are starting in the stream so it can catch up immediately.
382 if (first_sample_) {
383 GstPad *src = gst_element_get_static_pad(appsrc_, "src");
384 if (src == NULL) {
385 return;
386 }
387
388 GstSegment *segment = gst_sample_get_segment(sample);
389 GstBuffer *buffer = gst_sample_get_buffer(sample);
390
391 guint64 offset = gst_segment_to_running_time(segment, GST_FORMAT_TIME,
392 GST_BUFFER_PTS(buffer));
393 LOG(INFO) << "Fixing offset " << offset;
394 gst_pad_set_offset(src, -offset);
395
396 gst_object_unref(GST_OBJECT(src));
397 first_sample_ = false;
398 }
399}
400
401void Connection::OnOfferCreated(GstPromise *promise) {
402 LOG(INFO) << "OnOfferCreated";
403
404 GstWebRTCSessionDescription *offer = NULL;
405 gst_structure_get(gst_promise_get_reply(promise), "offer",
406 GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL);
407 gst_promise_unref(promise);
408
409 {
410 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)>
411 local_desc_promise(gst_promise_new(), &gst_promise_unref);
412 g_signal_emit_by_name(webrtcbin_, "set-local-description", offer,
413 local_desc_promise.get());
414 gst_promise_interrupt(local_desc_promise.get());
415 }
416
417 GstSDPMessage *sdp_msg = offer->sdp;
418 std::string sdp_str(gst_sdp_message_as_text(sdp_msg));
419
420 LOG(INFO) << "Negotiation offer created:\n" << sdp_str;
421
422 flatbuffers::FlatBufferBuilder fbb(512);
423 flatbuffers::Offset<WebSocketSdp> sdp_fb =
424 CreateWebSocketSdpDirect(fbb, SdpType::OFFER, sdp_str.c_str());
425 flatbuffers::Offset<WebSocketMessage> answer_message =
426 CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union());
427 fbb.Finish(answer_message);
428
429 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
430}
431
432void Connection::OnNegotiationNeeded() {
433 LOG(INFO) << "OnNegotiationNeeded";
434
435 GstPromise *promise;
436 promise = gst_promise_new_with_change_func(Connection::OnOfferCreatedCallback,
437 static_cast<gpointer>(this), NULL);
438 g_signal_emit_by_name(G_OBJECT(webrtcbin_), "create-offer", NULL, promise);
439}
440
441void Connection::OnIceCandidate(guint mline_index, gchar *candidate) {
442 LOG(INFO) << "OnIceCandidate";
443
444 flatbuffers::FlatBufferBuilder fbb(512);
445
446 auto ice_fb_builder = WebSocketIce::Builder(fbb);
447 ice_fb_builder.add_sdp_m_line_index(mline_index);
448 ice_fb_builder.add_sdp_mid(fbb.CreateString("video0"));
449 ice_fb_builder.add_candidate(
450 fbb.CreateString(static_cast<char *>(candidate)));
451 flatbuffers::Offset<WebSocketIce> ice_fb = ice_fb_builder.Finish();
452
453 flatbuffers::Offset<WebSocketMessage> ice_message =
454 CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union());
455 fbb.Finish(ice_message);
456
457 server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release()));
458
459 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
460 candidate);
461}
462
463void Connection::HandleWebSocketData(const uint8_t *data, size_t /* size*/) {
464 LOG(INFO) << "HandleWebSocketData";
465
466 const WebSocketMessage *message =
467 flatbuffers::GetRoot<WebSocketMessage>(data);
468
469 switch (message->payload_type()) {
470 case Payload::WebSocketSdp: {
471 const WebSocketSdp *offer = message->payload_as_WebSocketSdp();
472 if (offer->type() != SdpType::ANSWER) {
473 LOG(WARNING) << "Expected SDP message type \"answer\"";
474 break;
475 }
476 const flatbuffers::String *sdp_string = offer->payload();
477
478 LOG(INFO) << "Received SDP:\n" << sdp_string->c_str();
479
480 GstSDPMessage *sdp;
481 GstSDPResult status = gst_sdp_message_new(&sdp);
482 if (status != GST_SDP_OK) {
483 LOG(WARNING) << "Could not create SDP message";
484 break;
485 }
486
487 status = gst_sdp_message_parse_buffer((const guint8 *)sdp_string->c_str(),
488 sdp_string->size(), sdp);
489
490 if (status != GST_SDP_OK) {
491 LOG(WARNING) << "Could not parse SDP string";
492 break;
493 }
494
495 std::unique_ptr<GstWebRTCSessionDescription,
496 decltype(&gst_webrtc_session_description_free)>
497 answer(gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER,
498 sdp),
499 &gst_webrtc_session_description_free);
500 std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> promise(
501 gst_promise_new(), &gst_promise_unref);
502 g_signal_emit_by_name(webrtcbin_, "set-remote-description", answer.get(),
503 promise.get());
504 gst_promise_interrupt(promise.get());
505
506 break;
507 }
508 case Payload::WebSocketIce: {
509 const WebSocketIce *ice = message->payload_as_WebSocketIce();
510 if (!ice->has_candidate() || ice->candidate()->size() == 0) {
511 LOG(WARNING) << "Received ICE message without candidate";
512 break;
513 }
514
515 const gchar *candidate =
516 static_cast<const gchar *>(ice->candidate()->c_str());
517 guint mline_index = ice->sdp_m_line_index();
518
519 LOG(INFO) << "Received ICE candidate with mline index " << mline_index
520 << "; candidate: " << candidate;
521
522 g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index,
523 candidate);
524
525 break;
526 }
527 default:
528 break;
529 }
530}
531
532void RegisterPlugins() {
533 GST_PLUGIN_STATIC_REGISTER(app);
534 GST_PLUGIN_STATIC_REGISTER(coreelements);
535 GST_PLUGIN_STATIC_REGISTER(dtls);
536 GST_PLUGIN_STATIC_REGISTER(nice);
537 GST_PLUGIN_STATIC_REGISTER(rtp);
538 GST_PLUGIN_STATIC_REGISTER(rtpmanager);
539 GST_PLUGIN_STATIC_REGISTER(srtp);
540 GST_PLUGIN_STATIC_REGISTER(webrtc);
541 GST_PLUGIN_STATIC_REGISTER(video4linux2);
542 GST_PLUGIN_STATIC_REGISTER(videoconvert);
543 GST_PLUGIN_STATIC_REGISTER(videoparsersbad);
544 GST_PLUGIN_STATIC_REGISTER(videorate);
545 GST_PLUGIN_STATIC_REGISTER(videoscale);
546 GST_PLUGIN_STATIC_REGISTER(videotestsrc);
547 GST_PLUGIN_STATIC_REGISTER(x264);
548}
549
550int main(int argc, char **argv) {
551 aos::InitGoogle(&argc, &argv);
552
553 findEmbeddedContent("");
554
555 std::string openssl_env = "OPENSSL_CONF=\"\"";
556 putenv(const_cast<char *>(openssl_env.c_str()));
557
558 putenv(const_cast<char *>("GST_REGISTRY_DISABLE=yes"));
559
560 gst_init(&argc, &argv);
561 RegisterPlugins();
562
563 aos::FlatbufferDetachedBuffer<aos::Configuration> config =
564 aos::configuration::ReadConfig(FLAGS_config);
565 aos::ShmEventLoop event_loop(&config.message());
566
567 {
568 aos::GlibMainLoop main_loop(&event_loop);
569
570 seasocks::Server server(::std::shared_ptr<seasocks::Logger>(
571 new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info)));
572
573 LOG(INFO) << "Serving from " << FLAGS_data_dir;
574
575 auto websocket_handler =
576 std::make_shared<WebsocketHandler>(&event_loop, &server);
577 server.addWebSocketHandler("/ws", websocket_handler);
578
579 server.startListening(1180);
580 server.setStaticPath(FLAGS_data_dir.c_str());
581
582 aos::internal::EPoll *epoll = event_loop.epoll();
583
584 epoll->OnReadable(server.fd(), [&server] {
585 CHECK(::seasocks::Server::PollResult::Continue == server.poll(0));
586 });
587
588 event_loop.Run();
589
590 epoll->DeleteFd(server.fd());
591 server.terminate();
592 }
593
594 gst_deinit();
595
596 return 0;
597}