| #define GST_USE_UNSTABLE_API |
| #define GST_DISABLE_REGISTRY 1 |
| |
| #include <glib-unix.h> |
| #include <glib.h> |
| #include <gst/app/app.h> |
| #include <gst/gst.h> |
| #include <gst/sdp/sdp.h> |
| #include <gst/webrtc/icetransport.h> |
| #include <gst/webrtc/webrtc.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| |
| #include <map> |
| #include <thread> |
| |
| #include "absl/flags/flag.h" |
| #include "absl/log/check.h" |
| #include "absl/log/log.h" |
| #include "absl/strings/str_format.h" |
| #include "flatbuffers/flatbuffers.h" |
| |
| #include "aos/events/glib_main_loop.h" |
| #include "aos/events/shm_event_loop.h" |
| #include "aos/init.h" |
| #include "aos/network/web_proxy_generated.h" |
| #include "aos/seasocks/seasocks_logger.h" |
| #include "frc971/vision/vision_generated.h" |
| #include "internal/Embedded.h" |
| #include "seasocks/Server.h" |
| #include "seasocks/StringUtil.h" |
| #include "seasocks/WebSocket.h" |
| |
| ABSL_FLAG(std::string, config, "aos_config.json", |
| "Name of the config file to replay using."); |
| ABSL_FLAG(std::string, device, "/dev/video0", |
| "Camera fd. Ignored if reading from channel"); |
| ABSL_FLAG(std::string, data_dir, "image_streamer_www", |
| "Directory to serve data files from"); |
| ABSL_FLAG(bool, publish_images, true, |
| "If true, publish images read from v4l2 to /camera."); |
| ABSL_FLAG(int32_t, width, 400, "Image width"); |
| ABSL_FLAG(int32_t, height, 300, "Image height"); |
| ABSL_FLAG(int32_t, framerate, 25, "Framerate (FPS)"); |
| ABSL_FLAG(int32_t, brightness, 50, "Camera brightness"); |
| ABSL_FLAG(int32_t, exposure, 300, "Manual exposure"); |
| ABSL_FLAG(int32_t, bitrate, 500000, "H264 encode bitrate"); |
| ABSL_FLAG(int32_t, streaming_port, 1180, |
| "Port to stream images on with seasocks"); |
| ABSL_FLAG(int32_t, min_port, 5800, "Min rtp port"); |
| ABSL_FLAG(int32_t, max_port, 5810, "Max rtp port"); |
| ABSL_FLAG(std::string, listen_on, "", |
| "Channel on which to receive frames from. Used in place of " |
| "internal V4L2 reader. Note: width and height MUST match the " |
| "expected size of channel images."); |
| |
| class Connection; |
| |
| using aos::web_proxy::Payload; |
| using aos::web_proxy::SdpType; |
| using aos::web_proxy::WebSocketIce; |
| using aos::web_proxy::WebSocketMessage; |
| using aos::web_proxy::WebSocketSdp; |
| |
| class GstSampleSource { |
| public: |
| GstSampleSource() = default; |
| |
| virtual ~GstSampleSource() = default; |
| |
| private: |
| GstSampleSource(const GstSampleSource &) = delete; |
| }; |
| |
| class V4L2Source : public GstSampleSource { |
| public: |
| V4L2Source(std::function<void(GstSample *)> callback) |
| : callback_(std::move(callback)) { |
| GError *error = NULL; |
| |
| // Create pipeline to read from camera, pack into rtp stream, and dump |
| // stream to callback. v4l2 device should already be configured with correct |
| // bitrate from v4l2-ctl. do-timestamp marks the time the frame was taken to |
| // track when it should be dropped under latency. |
| |
| // With the Pi's hardware encoder, we can encode and package the stream once |
| // and the clients will jump in at any point unsynchronized. With the stream |
| // from x264enc this doesn't seem to work. For now, just reencode for each |
| // client since we don't expect more than 1 or 2. |
| |
| std::string exposure; |
| if (absl::GetFlag(FLAGS_exposure) > 0) { |
| exposure = absl::StrFormat(",auto_exposure=1,exposure_time_absolute=%d", |
| absl::GetFlag(FLAGS_exposure)); |
| } |
| |
| pipeline_ = gst_parse_launch( |
| absl::StrFormat("v4l2src device=%s do-timestamp=true " |
| "extra-controls=\"c,brightness=%d%s\" ! " |
| "video/x-raw,width=%d,height=%d,framerate=%d/" |
| "1,format=YUY2 ! appsink " |
| "name=appsink " |
| "emit-signals=true sync=false async=false " |
| "caps=video/x-raw,format=YUY2", |
| absl::GetFlag(FLAGS_device), |
| absl::GetFlag(FLAGS_brightness), exposure, |
| absl::GetFlag(FLAGS_width), absl::GetFlag(FLAGS_height), |
| absl::GetFlag(FLAGS_framerate)) |
| .c_str(), |
| &error); |
| |
| if (error != NULL) { |
| LOG(FATAL) << "Could not create v4l2 pipeline: " << error->message; |
| } |
| |
| appsink_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsink"); |
| if (appsink_ == NULL) { |
| LOG(FATAL) << "Could not get appsink"; |
| } |
| |
| g_signal_connect(appsink_, "new-sample", |
| G_CALLBACK(V4L2Source::OnSampleCallback), |
| static_cast<gpointer>(this)); |
| |
| gst_element_set_state(pipeline_, GST_STATE_PLAYING); |
| } |
| |
| ~V4L2Source() { |
| if (pipeline_ != NULL) { |
| gst_element_set_state(GST_ELEMENT(pipeline_), GST_STATE_NULL); |
| gst_object_unref(GST_OBJECT(pipeline_)); |
| gst_object_unref(GST_OBJECT(appsink_)); |
| } |
| } |
| |
| private: |
| static GstFlowReturn OnSampleCallback(GstElement *, gpointer user_data) { |
| static_cast<V4L2Source *>(user_data)->OnSample(); |
| return GST_FLOW_OK; |
| } |
| |
| void OnSample() { |
| GstSample *sample = gst_app_sink_pull_sample(GST_APP_SINK(appsink_)); |
| if (sample == NULL) { |
| LOG(WARNING) << "Received null sample"; |
| return; |
| } |
| callback_(sample); |
| gst_sample_unref(sample); |
| } |
| |
| GstElement *pipeline_; |
| GstElement *appsink_; |
| |
| std::function<void(GstSample *)> callback_; |
| }; |
| |
| class ChannelSource : public GstSampleSource { |
| public: |
| ChannelSource(aos::ShmEventLoop *event_loop, |
| std::function<void(GstSample *)> callback) |
| : callback_(std::move(callback)) { |
| event_loop->MakeWatcher( |
| absl::GetFlag(FLAGS_listen_on), |
| [this](const frc971::vision::CameraImage &image) { OnImage(image); }); |
| } |
| |
| private: |
| void OnImage(const frc971::vision::CameraImage &image) { |
| if (!image.has_rows() || !image.has_cols() || !image.has_data()) { |
| VLOG(2) << "Skipping CameraImage with no data"; |
| return; |
| } |
| CHECK_EQ(image.rows(), absl::GetFlag(FLAGS_height)); |
| CHECK_EQ(image.cols(), absl::GetFlag(FLAGS_width)); |
| |
| GBytes *bytes = g_bytes_new(image.data()->data(), image.data()->size()); |
| GstBuffer *buffer = gst_buffer_new_wrapped_bytes(bytes); |
| |
| GST_BUFFER_PTS(buffer) = image.monotonic_timestamp_ns(); |
| |
| GstCaps *caps = gst_caps_new_simple( |
| "video/x-raw", "width", G_TYPE_INT, image.cols(), "height", G_TYPE_INT, |
| image.rows(), "format", G_TYPE_STRING, "YUY2", nullptr); |
| CHECK(caps != nullptr); |
| |
| GstSample *sample = gst_sample_new(buffer, caps, nullptr, nullptr); |
| |
| callback_(sample); |
| |
| gst_sample_unref(sample); |
| gst_caps_unref(caps); |
| gst_buffer_unref(buffer); |
| g_bytes_unref(bytes); |
| } |
| |
| std::function<void(GstSample *)> callback_; |
| }; |
| |
| // Basic class that handles receiving new websocket connections. Creates a new |
| // Connection to manage the rest of the negotiation and data passing. When the |
| // websocket closes, it deletes the Connection. |
| class WebsocketHandler : public ::seasocks::WebSocket::Handler { |
| public: |
| WebsocketHandler(aos::ShmEventLoop *event_loop, ::seasocks::Server *server); |
| ~WebsocketHandler() override = default; |
| |
| void onConnect(::seasocks::WebSocket *sock) override; |
| void onData(::seasocks::WebSocket *sock, const uint8_t *data, |
| size_t size) override; |
| void onDisconnect(::seasocks::WebSocket *sock) override; |
| |
| private: |
| void OnSample(GstSample *sample); |
| |
| aos::ShmEventLoop *event_loop_; |
| std::map<::seasocks::WebSocket *, std::unique_ptr<Connection>> connections_; |
| ::seasocks::Server *server_; |
| std::unique_ptr<GstSampleSource> source_; |
| aos::TimerHandler *manual_restart_handle_; |
| |
| aos::Sender<frc971::vision::CameraImage> sender_; |
| }; |
| |
| // Seasocks requires that sends happen on the correct thread. This class takes a |
| // detached buffer to send on a specific websocket connection and sends it when |
| // seasocks is ready. |
| class UpdateData : public ::seasocks::Server::Runnable { |
| public: |
| UpdateData(::seasocks::WebSocket *websocket, |
| flatbuffers::DetachedBuffer &&buffer) |
| : sock_(websocket), buffer_(std::move(buffer)) {} |
| ~UpdateData() override = default; |
| UpdateData(const UpdateData &) = delete; |
| UpdateData &operator=(const UpdateData &) = delete; |
| |
| void run() override { sock_->send(buffer_.data(), buffer_.size()); } |
| |
| private: |
| ::seasocks::WebSocket *sock_; |
| const flatbuffers::DetachedBuffer buffer_; |
| }; |
| |
| class Connection { |
| public: |
| Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server); |
| |
| ~Connection(); |
| |
| void HandleWebSocketData(const uint8_t *data, size_t size); |
| |
| void OnSample(GstSample *sample); |
| |
| private: |
| static void OnOfferCreatedCallback(GstPromise *promise, gpointer user_data) { |
| static_cast<Connection *>(user_data)->OnOfferCreated(promise); |
| } |
| |
| static void OnNegotiationNeededCallback(GstElement *, gpointer user_data) { |
| static_cast<Connection *>(user_data)->OnNegotiationNeeded(); |
| } |
| |
| static void OnIceCandidateCallback(GstElement *, guint mline_index, |
| gchar *candidate, gpointer user_data) { |
| static_cast<Connection *>(user_data)->OnIceCandidate(mline_index, |
| candidate); |
| } |
| |
| void OnOfferCreated(GstPromise *promise); |
| void OnNegotiationNeeded(); |
| void OnIceCandidate(guint mline_index, gchar *candidate); |
| |
| ::seasocks::WebSocket *sock_; |
| ::seasocks::Server *server_; |
| |
| GstElement *pipeline_; |
| GstElement *webrtcbin_; |
| GstElement *appsrc_; |
| |
| bool first_sample_ = true; |
| }; |
| |
| WebsocketHandler::WebsocketHandler(aos::ShmEventLoop *event_loop, |
| ::seasocks::Server *server) |
| : event_loop_(event_loop), |
| server_(server), |
| manual_restart_handle_( |
| event_loop_->AddTimer([this]() { event_loop_->Exit(); })) { |
| if (absl::GetFlag(FLAGS_listen_on).empty()) { |
| if (absl::GetFlag(FLAGS_publish_images)) { |
| sender_ = event_loop->MakeSender<frc971::vision::CameraImage>("/camera"); |
| } |
| source_ = |
| std::make_unique<V4L2Source>([this](auto sample) { OnSample(sample); }); |
| } else { |
| source_ = std::make_unique<ChannelSource>( |
| event_loop, [this](auto sample) { OnSample(sample); }); |
| } |
| event_loop_->OnRun([this]() { |
| manual_restart_handle_->Schedule(event_loop_->monotonic_now() + |
| std::chrono::seconds(10)); |
| }); |
| } |
| |
| void WebsocketHandler::onConnect(::seasocks::WebSocket *sock) { |
| std::unique_ptr<Connection> conn = |
| std::make_unique<Connection>(sock, server_); |
| connections_.insert({sock, std::move(conn)}); |
| } |
| |
| void WebsocketHandler::onData(::seasocks::WebSocket *sock, const uint8_t *data, |
| size_t size) { |
| connections_[sock]->HandleWebSocketData(data, size); |
| } |
| |
| void WebsocketHandler::OnSample(GstSample *sample) { |
| for (auto iter = connections_.begin(); iter != connections_.end(); ++iter) { |
| iter->second->OnSample(sample); |
| } |
| |
| if (sender_.valid()) { |
| const GstCaps *caps = gst_sample_get_caps(sample); |
| CHECK(caps != nullptr); |
| CHECK_GT(gst_caps_get_size(caps), 0U); |
| const GstStructure *str = gst_caps_get_structure(caps, 0); |
| |
| gint width; |
| gint height; |
| |
| CHECK(gst_structure_get_int(str, "width", &width)); |
| CHECK(gst_structure_get_int(str, "height", &height)); |
| |
| GstBuffer *buffer = gst_sample_get_buffer(sample); |
| CHECK(buffer != nullptr); |
| |
| const gsize size = gst_buffer_get_size(buffer); |
| |
| auto builder = sender_.MakeBuilder(); |
| |
| uint8_t *image_data; |
| auto image_offset = |
| builder.fbb()->CreateUninitializedVector(size, &image_data); |
| gst_buffer_extract(buffer, 0, image_data, size); |
| |
| auto image_builder = builder.MakeBuilder<frc971::vision::CameraImage>(); |
| image_builder.add_rows(height); |
| image_builder.add_cols(width); |
| image_builder.add_data(image_offset); |
| |
| builder.CheckOk(builder.Send(image_builder.Finish())); |
| } |
| manual_restart_handle_->Schedule(event_loop_->monotonic_now() + |
| std::chrono::seconds(10)); |
| } |
| |
| void WebsocketHandler::onDisconnect(::seasocks::WebSocket *sock) { |
| connections_.erase(sock); |
| } |
| |
| Connection::Connection(::seasocks::WebSocket *sock, ::seasocks::Server *server) |
| : sock_(sock), server_(server) { |
| GError *error = NULL; |
| |
| // Build pipeline to read data from application into pipeline, place in |
| // webrtcbin group, and stream. |
| |
| pipeline_ = gst_parse_launch( |
| // aggregate-mode should be zero-latency but this drops the stream on |
| // bitrate spikes for some reason - probably the weak CPU on the pi. |
| absl::StrFormat( |
| "webrtcbin name=webrtcbin appsrc " |
| "name=appsrc block=false " |
| "is-live=true " |
| "format=3 max-buffers=0 leaky-type=2 " |
| "caps=video/x-raw,width=%d,height=%d,format=YUY2 ! videoconvert ! " |
| "x264enc bitrate=%d speed-preset=ultrafast " |
| "tune=zerolatency key-int-max=15 sliced-threads=true ! " |
| "video/x-h264,profile=constrained-baseline ! h264parse ! " |
| "rtph264pay " |
| "config-interval=-1 name=payloader aggregate-mode=none ! " |
| "application/" |
| "x-rtp,media=video,encoding-name=H264,payload=96,clock-rate=90000 !" |
| "webrtcbin. ", |
| absl::GetFlag(FLAGS_width), absl::GetFlag(FLAGS_height), |
| absl::GetFlag(FLAGS_bitrate) / 1000) |
| .c_str(), |
| &error); |
| |
| if (error != NULL) { |
| LOG(FATAL) << "Could not create WebRTC pipeline: " << error->message; |
| } |
| |
| webrtcbin_ = gst_bin_get_by_name(GST_BIN(pipeline_), "webrtcbin"); |
| if (webrtcbin_ == NULL) { |
| LOG(FATAL) << "Could not initialize webrtcbin"; |
| } |
| |
| appsrc_ = gst_bin_get_by_name(GST_BIN(pipeline_), "appsrc"); |
| if (appsrc_ == NULL) { |
| LOG(FATAL) << "Could not initialize appsrc"; |
| } |
| |
| { |
| GArray *transceivers; |
| g_signal_emit_by_name(webrtcbin_, "get-transceivers", &transceivers); |
| if (transceivers == NULL || transceivers->len <= 0) { |
| LOG(FATAL) << "Could not initialize transceivers"; |
| } |
| |
| GstWebRTCRTPTransceiver *trans = |
| g_array_index(transceivers, GstWebRTCRTPTransceiver *, 0); |
| g_object_set(trans, "direction", |
| GST_WEBRTC_RTP_TRANSCEIVER_DIRECTION_SENDONLY, nullptr); |
| |
| g_array_unref(transceivers); |
| } |
| |
| { |
| GstObject *ice = nullptr; |
| g_object_get(G_OBJECT(webrtcbin_), "ice-agent", &ice, nullptr); |
| CHECK(ice != nullptr); |
| |
| g_object_set(ice, "min-rtp-port", absl::GetFlag(FLAGS_min_port), |
| "max-rtp-port", absl::GetFlag(FLAGS_max_port), nullptr); |
| |
| // We don't need upnp on a local network. |
| { |
| GstObject *nice = nullptr; |
| g_object_get(ice, "agent", &nice, nullptr); |
| CHECK(nice != nullptr); |
| |
| g_object_set(nice, "upnp", false, nullptr); |
| g_object_unref(nice); |
| } |
| |
| gst_object_unref(ice); |
| } |
| |
| g_signal_connect(webrtcbin_, "on-negotiation-needed", |
| G_CALLBACK(Connection::OnNegotiationNeededCallback), |
| static_cast<gpointer>(this)); |
| |
| g_signal_connect(webrtcbin_, "on-ice-candidate", |
| G_CALLBACK(Connection::OnIceCandidateCallback), |
| static_cast<gpointer>(this)); |
| |
| gst_element_set_state(pipeline_, GST_STATE_READY); |
| gst_element_set_state(pipeline_, GST_STATE_PLAYING); |
| } |
| |
| Connection::~Connection() { |
| if (pipeline_ != NULL) { |
| gst_element_set_state(pipeline_, GST_STATE_NULL); |
| |
| gst_object_unref(GST_OBJECT(webrtcbin_)); |
| gst_object_unref(GST_OBJECT(pipeline_)); |
| gst_object_unref(GST_OBJECT(appsrc_)); |
| } |
| } |
| |
| void Connection::OnSample(GstSample *sample) { |
| GstFlowReturn response = |
| gst_app_src_push_sample(GST_APP_SRC(appsrc_), sample); |
| if (response != GST_FLOW_OK) { |
| LOG(WARNING) << "Sample pushed, did not receive OK"; |
| } |
| |
| // Since the stream is already running (the camera turns on with |
| // image_streamer) we need to tell the new appsrc where |
| // we are starting in the stream so it can catch up immediately. |
| if (first_sample_) { |
| GstPad *src = gst_element_get_static_pad(appsrc_, "src"); |
| if (src == NULL) { |
| return; |
| } |
| |
| GstSegment *segment = gst_sample_get_segment(sample); |
| GstBuffer *buffer = gst_sample_get_buffer(sample); |
| |
| guint64 offset = gst_segment_to_running_time(segment, GST_FORMAT_TIME, |
| GST_BUFFER_PTS(buffer)); |
| LOG(INFO) << "Fixing offset " << offset; |
| gst_pad_set_offset(src, -offset); |
| |
| gst_object_unref(GST_OBJECT(src)); |
| first_sample_ = false; |
| } |
| } |
| |
| void Connection::OnOfferCreated(GstPromise *promise) { |
| LOG(INFO) << "OnOfferCreated"; |
| |
| GstWebRTCSessionDescription *offer = NULL; |
| gst_structure_get(gst_promise_get_reply(promise), "offer", |
| GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &offer, NULL); |
| gst_promise_unref(promise); |
| |
| { |
| std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> |
| local_desc_promise(gst_promise_new(), &gst_promise_unref); |
| g_signal_emit_by_name(webrtcbin_, "set-local-description", offer, |
| local_desc_promise.get()); |
| gst_promise_interrupt(local_desc_promise.get()); |
| } |
| |
| GstSDPMessage *sdp_msg = offer->sdp; |
| std::string sdp_str(gst_sdp_message_as_text(sdp_msg)); |
| |
| LOG(INFO) << "Negotiation offer created:\n" << sdp_str; |
| |
| flatbuffers::FlatBufferBuilder fbb(512); |
| flatbuffers::Offset<WebSocketSdp> sdp_fb = |
| CreateWebSocketSdpDirect(fbb, SdpType::OFFER, sdp_str.c_str()); |
| flatbuffers::Offset<WebSocketMessage> answer_message = |
| CreateWebSocketMessage(fbb, Payload::WebSocketSdp, sdp_fb.Union()); |
| fbb.Finish(answer_message); |
| |
| server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release())); |
| } |
| |
| void Connection::OnNegotiationNeeded() { |
| LOG(INFO) << "OnNegotiationNeeded"; |
| |
| GstPromise *promise; |
| promise = gst_promise_new_with_change_func(Connection::OnOfferCreatedCallback, |
| static_cast<gpointer>(this), NULL); |
| g_signal_emit_by_name(G_OBJECT(webrtcbin_), "create-offer", NULL, promise); |
| } |
| |
| void Connection::OnIceCandidate(guint mline_index, gchar *candidate) { |
| LOG(INFO) << "OnIceCandidate"; |
| |
| flatbuffers::FlatBufferBuilder fbb(512); |
| |
| flatbuffers::Offset<flatbuffers::String> sdp_mid_offset = |
| fbb.CreateString("video0"); |
| flatbuffers::Offset<flatbuffers::String> candidate_offset = |
| fbb.CreateString(static_cast<char *>(candidate)); |
| |
| auto ice_fb_builder = WebSocketIce::Builder(fbb); |
| ice_fb_builder.add_sdp_m_line_index(mline_index); |
| ice_fb_builder.add_sdp_mid(sdp_mid_offset); |
| ice_fb_builder.add_candidate(candidate_offset); |
| flatbuffers::Offset<WebSocketIce> ice_fb = ice_fb_builder.Finish(); |
| |
| flatbuffers::Offset<WebSocketMessage> ice_message = |
| CreateWebSocketMessage(fbb, Payload::WebSocketIce, ice_fb.Union()); |
| fbb.Finish(ice_message); |
| |
| server_->execute(std::make_shared<UpdateData>(sock_, fbb.Release())); |
| |
| g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index, |
| candidate); |
| } |
| |
| void Connection::HandleWebSocketData(const uint8_t *data, size_t /* size*/) { |
| LOG(INFO) << "HandleWebSocketData"; |
| |
| const WebSocketMessage *message = |
| flatbuffers::GetRoot<WebSocketMessage>(data); |
| |
| switch (message->payload_type()) { |
| case Payload::WebSocketSdp: { |
| const WebSocketSdp *offer = message->payload_as_WebSocketSdp(); |
| if (offer->type() != SdpType::ANSWER) { |
| LOG(WARNING) << "Expected SDP message type \"answer\""; |
| break; |
| } |
| const flatbuffers::String *sdp_string = offer->payload(); |
| |
| LOG(INFO) << "Received SDP:\n" << sdp_string->c_str(); |
| |
| GstSDPMessage *sdp; |
| GstSDPResult status = gst_sdp_message_new(&sdp); |
| if (status != GST_SDP_OK) { |
| LOG(WARNING) << "Could not create SDP message"; |
| break; |
| } |
| |
| status = gst_sdp_message_parse_buffer((const guint8 *)sdp_string->c_str(), |
| sdp_string->size(), sdp); |
| |
| if (status != GST_SDP_OK) { |
| LOG(WARNING) << "Could not parse SDP string"; |
| break; |
| } |
| |
| std::unique_ptr<GstWebRTCSessionDescription, |
| decltype(&gst_webrtc_session_description_free)> |
| answer(gst_webrtc_session_description_new(GST_WEBRTC_SDP_TYPE_ANSWER, |
| sdp), |
| &gst_webrtc_session_description_free); |
| std::unique_ptr<GstPromise, decltype(&gst_promise_unref)> promise( |
| gst_promise_new(), &gst_promise_unref); |
| g_signal_emit_by_name(webrtcbin_, "set-remote-description", answer.get(), |
| promise.get()); |
| gst_promise_interrupt(promise.get()); |
| |
| break; |
| } |
| case Payload::WebSocketIce: { |
| const WebSocketIce *ice = message->payload_as_WebSocketIce(); |
| if (!ice->has_candidate() || ice->candidate()->size() == 0) { |
| LOG(WARNING) << "Received ICE message without candidate"; |
| break; |
| } |
| |
| const gchar *candidate = |
| static_cast<const gchar *>(ice->candidate()->c_str()); |
| guint mline_index = ice->sdp_m_line_index(); |
| |
| LOG(INFO) << "Received ICE candidate with mline index " << mline_index |
| << "; candidate: " << candidate; |
| |
| g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index, |
| candidate); |
| |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| |
| int main(int argc, char **argv) { |
| aos::InitGoogle(&argc, &argv); |
| |
| findEmbeddedContent(""); |
| |
| std::string openssl_env = "OPENSSL_CONF=\"\""; |
| putenv(const_cast<char *>(openssl_env.c_str())); |
| |
| gst_init(&argc, &argv); |
| |
| aos::FlatbufferDetachedBuffer<aos::Configuration> config = |
| aos::configuration::ReadConfig(absl::GetFlag(FLAGS_config)); |
| aos::ShmEventLoop event_loop(&config.message()); |
| |
| { |
| aos::GlibMainLoop main_loop(&event_loop); |
| |
| seasocks::Server server(::std::shared_ptr<seasocks::Logger>( |
| new ::aos::seasocks::SeasocksLogger(seasocks::Logger::Level::Info))); |
| |
| LOG(INFO) << "Serving from " << absl::GetFlag(FLAGS_data_dir); |
| |
| auto websocket_handler = |
| std::make_shared<WebsocketHandler>(&event_loop, &server); |
| server.addWebSocketHandler("/ws", websocket_handler); |
| |
| server.startListening(absl::GetFlag(FLAGS_streaming_port)); |
| server.setStaticPath(absl::GetFlag(FLAGS_data_dir).c_str()); |
| |
| aos::internal::EPoll *epoll = event_loop.epoll(); |
| |
| epoll->OnReadable(server.fd(), [&server] { |
| CHECK(::seasocks::Server::PollResult::Continue == server.poll(0)); |
| }); |
| |
| event_loop.Run(); |
| |
| epoll->DeleteFd(server.fd()); |
| server.terminate(); |
| } |
| |
| gst_deinit(); |
| |
| return 0; |
| } |