blob: f08a5a098fcab5ab0e6ab9fe617f0af24f971f97 [file] [log] [blame]
James Kuszmaul82f6c042021-01-17 11:30:16 -08001/**
2 * @file rtmp/conn.c Real Time Messaging Protocol (RTMP) -- NetConnection
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6#include <string.h>
7#include <re_types.h>
8#include <re_fmt.h>
9#include <re_mem.h>
10#include <re_mbuf.h>
11#include <re_net.h>
12#include <re_sa.h>
13#include <re_list.h>
14#include <re_tcp.h>
15#include <re_sys.h>
16#include <re_odict.h>
17#include <re_dns.h>
18#include <re_uri.h>
19#include <re_rtmp.h>
20#include "rtmp.h"
21
22
23enum {
24 WINDOW_ACK_SIZE = 2500000
25};
26
27
28static int req_connect(struct rtmp_conn *conn);
29
30
31static void conn_destructor(void *data)
32{
33 struct rtmp_conn *conn = data;
34
35 list_flush(&conn->ctransl);
36 list_flush(&conn->streaml);
37
38 mem_deref(conn->dnsq6);
39 mem_deref(conn->dnsq4);
40 mem_deref(conn->dnsc);
41 mem_deref(conn->tc);
42 mem_deref(conn->mb);
43 mem_deref(conn->dechunk);
44 mem_deref(conn->uri);
45 mem_deref(conn->app);
46 mem_deref(conn->host);
47 mem_deref(conn->stream);
48}
49
50
51static int handle_amf_command(struct rtmp_conn *conn, uint32_t stream_id,
52 struct mbuf *mb)
53{
54 struct odict *msg = NULL;
55 const char *name;
56 int err;
57
58 err = rtmp_amf_decode(&msg, mb);
59 if (err)
60 return err;
61
62 name = odict_string(msg, "0");
63
64 if (conn->is_client &&
65 (0 == str_casecmp(name, "_result") ||
66 0 == str_casecmp(name, "_error"))) {
67
68 /* forward response to transaction layer */
69 rtmp_ctrans_response(&conn->ctransl, msg);
70 }
71 else {
72 struct rtmp_stream *strm;
73
74 if (stream_id == 0) {
75 if (conn->cmdh)
76 conn->cmdh(msg, conn->arg);
77 }
78 else {
79 strm = rtmp_stream_find(conn, stream_id);
80 if (strm) {
81 if (strm->cmdh)
82 strm->cmdh(msg, strm->arg);
83 }
84 }
85 }
86
87 mem_deref(msg);
88
89 return 0;
90}
91
92
93static int handle_user_control_msg(struct rtmp_conn *conn, struct mbuf *mb)
94{
95 struct rtmp_stream *strm;
96 enum rtmp_event_type event;
97 uint32_t value;
98 int err;
99
100 if (mbuf_get_left(mb) < 6)
101 return EBADMSG;
102
103 event = ntohs(mbuf_read_u16(mb));
104 value = ntohl(mbuf_read_u32(mb));
105
106 switch (event) {
107
108 case RTMP_EVENT_STREAM_BEGIN:
109 case RTMP_EVENT_STREAM_EOF:
110 case RTMP_EVENT_STREAM_DRY:
111 case RTMP_EVENT_STREAM_IS_RECORDED:
112 case RTMP_EVENT_SET_BUFFER_LENGTH:
113
114 if (value != RTMP_CONTROL_STREAM_ID) {
115
116 strm = rtmp_stream_find(conn, value);
117 if (strm && strm->ctrlh)
118 strm->ctrlh(event, mb, strm->arg);
119 }
120 break;
121
122 case RTMP_EVENT_PING_REQUEST:
123
124 err = rtmp_control(conn, RTMP_TYPE_USER_CONTROL_MSG,
125 RTMP_EVENT_PING_RESPONSE, value);
126 if (err)
127 return err;
128 break;
129
130 default:
131 break;
132 }
133
134 return 0;
135}
136
137
138static int handle_data_message(struct rtmp_conn *conn, uint32_t stream_id,
139 struct mbuf *mb)
140{
141 struct rtmp_stream *strm;
142 struct odict *msg;
143 int err;
144
145 err = rtmp_amf_decode(&msg, mb);
146 if (err)
147 return err;
148
149 strm = rtmp_stream_find(conn, stream_id);
150 if (strm && strm->datah)
151 strm->datah(msg, strm->arg);
152
153 mem_deref(msg);
154
155 return 0;
156}
157
158
159static int rtmp_dechunk_handler(const struct rtmp_header *hdr,
160 struct mbuf *mb, void *arg)
161{
162 struct rtmp_conn *conn = arg;
163 struct rtmp_stream *strm;
164 uint32_t val;
165 uint32_t was;
166 uint8_t limit;
167 int err = 0;
168
169 switch (hdr->type_id) {
170
171 case RTMP_TYPE_SET_CHUNK_SIZE:
172 if (mbuf_get_left(mb) < 4)
173 return EBADMSG;
174
175 val = ntohl(mbuf_read_u32(mb));
176
177 val = val & 0x7fffffff;
178
179 rtmp_dechunker_set_chunksize(conn->dechunk, val);
180 break;
181
182 case RTMP_TYPE_ACKNOWLEDGEMENT:
183 if (mbuf_get_left(mb) < 4)
184 return EBADMSG;
185
186 val = ntohl(mbuf_read_u32(mb));
187 (void)val;
188 break;
189
190 case RTMP_TYPE_AMF0:
191 err = handle_amf_command(conn, hdr->stream_id, mb);
192 break;
193
194 case RTMP_TYPE_WINDOW_ACK_SIZE:
195 if (mbuf_get_left(mb) < 4)
196 return EBADMSG;
197
198 was = ntohl(mbuf_read_u32(mb));
199 if (was != 0)
200 conn->window_ack_size = was;
201 break;
202
203 case RTMP_TYPE_SET_PEER_BANDWIDTH:
204 if (mbuf_get_left(mb) < 5)
205 return EBADMSG;
206
207 was = ntohl(mbuf_read_u32(mb));
208 limit = mbuf_read_u8(mb);
209 (void)limit;
210
211 if (was != 0)
212 conn->window_ack_size = was;
213
214 err = rtmp_control(conn, RTMP_TYPE_WINDOW_ACK_SIZE,
215 (uint32_t)WINDOW_ACK_SIZE);
216 break;
217
218 case RTMP_TYPE_USER_CONTROL_MSG:
219 err = handle_user_control_msg(conn, mb);
220 break;
221
222 /* XXX: common code for audio+video */
223 case RTMP_TYPE_AUDIO:
224 strm = rtmp_stream_find(conn, hdr->stream_id);
225 if (strm) {
226 if (strm->auh) {
227 strm->auh(hdr->timestamp,
228 mb->buf, mb->end,
229 strm->arg);
230 }
231 }
232 break;
233
234 case RTMP_TYPE_VIDEO:
235 strm = rtmp_stream_find(conn, hdr->stream_id);
236 if (strm) {
237 if (strm->vidh) {
238 strm->vidh(hdr->timestamp,
239 mb->buf, mb->end,
240 strm->arg);
241 }
242 }
243 break;
244
245 case RTMP_TYPE_DATA:
246 err = handle_data_message(conn, hdr->stream_id, mb);
247 break;
248
249 default:
250 break;
251 }
252
253 return err;
254}
255
256
257static struct rtmp_conn *rtmp_conn_alloc(bool is_client,
258 rtmp_estab_h *estabh,
259 rtmp_command_h *cmdh,
260 rtmp_close_h *closeh,
261 void *arg)
262{
263 struct rtmp_conn *conn;
264 int err;
265
266 conn = mem_zalloc(sizeof(*conn), conn_destructor);
267 if (!conn)
268 return NULL;
269
270 conn->is_client = is_client;
271 conn->state = RTMP_STATE_UNINITIALIZED;
272
273 conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
274 conn->window_ack_size = WINDOW_ACK_SIZE;
275
276 err = rtmp_dechunker_alloc(&conn->dechunk, RTMP_DEFAULT_CHUNKSIZE,
277 rtmp_dechunk_handler, conn);
278 if (err)
279 goto out;
280
281 /* must be above 2 */
282 conn->chunk_id_counter = RTMP_CHUNK_ID_CONN + 1;
283
284 conn->estabh = estabh;
285 conn->cmdh = cmdh;
286 conn->closeh = closeh;
287 conn->arg = arg;
288
289 out:
290 if (err)
291 return mem_deref(conn);
292
293 return conn;
294}
295
296
297static inline void set_state(struct rtmp_conn *conn,
298 enum rtmp_handshake_state state)
299{
300 conn->state = state;
301}
302
303
304static int send_packet(struct rtmp_conn *conn, const uint8_t *pkt, size_t len)
305{
306 struct mbuf *mb;
307 int err;
308
309 if (!conn || !pkt || !len)
310 return EINVAL;
311
312 mb = mbuf_alloc(len);
313 if (!mb)
314 return ENOMEM;
315
316 (void)mbuf_write_mem(mb, pkt, len);
317
318 mb->pos = 0;
319
320 err = tcp_send(conn->tc, mb);
321 if (err)
322 goto out;
323
324 out:
325 mem_deref(mb);
326
327 return err;
328}
329
330
331static int handshake_start(struct rtmp_conn *conn)
332{
333 uint8_t sig[1+RTMP_HANDSHAKE_SIZE];
334 int err;
335
336 sig[0] = RTMP_PROTOCOL_VERSION;
337 sig[1] = 0;
338 sig[2] = 0;
339 sig[3] = 0;
340 sig[4] = 0;
341 sig[5] = VER_MAJOR;
342 sig[6] = VER_MINOR;
343 sig[7] = VER_PATCH;
344 sig[8] = 0;
345 rand_bytes(sig + 9, sizeof(sig) - 9);
346
347 err = send_packet(conn, sig, sizeof(sig));
348 if (err)
349 return err;
350
351 set_state(conn, RTMP_STATE_VERSION_SENT);
352
353 return 0;
354}
355
356
357static void conn_close(struct rtmp_conn *conn, int err)
358{
359 rtmp_close_h *closeh;
360
361 conn->tc = mem_deref(conn->tc);
362 conn->dnsq6 = mem_deref(conn->dnsq6);
363 conn->dnsq4 = mem_deref(conn->dnsq4);
364
365 closeh = conn->closeh;
366 if (closeh) {
367 conn->closeh = NULL;
368 closeh(err, conn->arg);
369 }
370}
371
372
373static void tcp_estab_handler(void *arg)
374{
375 struct rtmp_conn *conn = arg;
376 int err = 0;
377
378 if (conn->is_client) {
379
380 err = handshake_start(conn);
381 }
382
383 if (err)
384 conn_close(conn, err);
385}
386
387
388/* Send AMF0 Command or Data */
389int rtmp_send_amf_command(const struct rtmp_conn *conn,
390 unsigned format, uint32_t chunk_id,
391 uint8_t type_id,
392 uint32_t msg_stream_id,
393 const uint8_t *cmd, size_t len)
394{
395 if (!conn || !cmd || !len)
396 return EINVAL;
397
398 return rtmp_chunker(format, chunk_id, 0, 0, type_id, msg_stream_id,
399 cmd, len, conn->send_chunk_size,
400 conn->tc);
401}
402
403
404static void connect_resp_handler(bool success, const struct odict *msg,
405 void *arg)
406{
407 struct rtmp_conn *conn = arg;
408 rtmp_estab_h *estabh;
409 (void)msg;
410
411 if (!success) {
412 conn_close(conn, EPROTO);
413 return;
414 }
415
416 conn->connected = true;
417
418 estabh = conn->estabh;
419 if (estabh) {
420 conn->estabh = NULL;
421 estabh(conn->arg);
422 }
423}
424
425
426static int send_connect(struct rtmp_conn *conn)
427{
428 const int ac = 0x0400; /* AAC */
429 const int vc = 0x0080; /* H264 */
430
431 return rtmp_amf_request(conn, RTMP_CONTROL_STREAM_ID, "connect",
432 connect_resp_handler, conn,
433 1,
434 RTMP_AMF_TYPE_OBJECT, 8,
435 RTMP_AMF_TYPE_STRING, "app", conn->app,
436 RTMP_AMF_TYPE_STRING, "flashVer", "LNX 9,0,124,2",
437 RTMP_AMF_TYPE_STRING, "tcUrl", conn->uri,
438 RTMP_AMF_TYPE_BOOLEAN, "fpad", false,
439 RTMP_AMF_TYPE_NUMBER, "capabilities", 15.0,
440 RTMP_AMF_TYPE_NUMBER, "audioCodecs", (double)ac,
441 RTMP_AMF_TYPE_NUMBER, "videoCodecs", (double)vc,
442 RTMP_AMF_TYPE_NUMBER, "videoFunction", 1.0);
443}
444
445
446static int client_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
447{
448 uint8_t s0;
449 uint8_t s1[RTMP_HANDSHAKE_SIZE];
450 int err = 0;
451
452 switch (conn->state) {
453
454 case RTMP_STATE_VERSION_SENT:
455 if (mbuf_get_left(mb) < (1+RTMP_HANDSHAKE_SIZE))
456 return ENODATA;
457
458 s0 = mbuf_read_u8(mb);
459 if (s0 != RTMP_PROTOCOL_VERSION)
460 return EPROTO;
461
462 (void)mbuf_read_mem(mb, s1, sizeof(s1));
463
464 err = send_packet(conn, s1, sizeof(s1));
465 if (err)
466 return err;
467
468 set_state(conn, RTMP_STATE_ACK_SENT);
469 break;
470
471 case RTMP_STATE_ACK_SENT:
472 if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
473 return ENODATA;
474
475 /* S2 (ignored) */
476 mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
477
478 conn->send_chunk_size = 4096;
479 err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
480 conn->send_chunk_size);
481 if (err)
482 return err;
483
484 err = send_connect(conn);
485 if (err)
486 return err;
487
488 set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
489 break;
490
491 case RTMP_STATE_HANDSHAKE_DONE:
492 err = rtmp_dechunker_receive(conn->dechunk, mb);
493 if (err)
494 return err;
495 break;
496
497 default:
498 return EPROTO;
499 }
500
501 return 0;
502}
503
504
505static int server_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
506{
507 uint8_t c0;
508 uint8_t c1[RTMP_HANDSHAKE_SIZE];
509 int err = 0;
510
511 switch (conn->state) {
512
513 case RTMP_STATE_UNINITIALIZED:
514 if (mbuf_get_left(mb) < 1)
515 return ENODATA;
516
517 c0 = mbuf_read_u8(mb);
518 if (c0 != RTMP_PROTOCOL_VERSION)
519 return EPROTO;
520
521 /* Send S0 + S1 */
522 err = handshake_start(conn);
523 if (err)
524 return err;
525 break;
526
527 case RTMP_STATE_VERSION_SENT:
528 if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
529 return ENODATA;
530
531 (void)mbuf_read_mem(mb, c1, sizeof(c1));
532
533 /* Copy C1 to S2 */
534 err = send_packet(conn, c1, sizeof(c1));
535 if (err)
536 return err;
537
538 set_state(conn, RTMP_STATE_ACK_SENT);
539 break;
540
541 case RTMP_STATE_ACK_SENT:
542 if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
543 return ENODATA;
544
545 /* C2 (ignored) */
546 mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
547
548 conn->send_chunk_size = 4096;
549 err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
550 conn->send_chunk_size);
551 if (err)
552 return err;
553
554 set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
555 break;
556
557 case RTMP_STATE_HANDSHAKE_DONE:
558 err = rtmp_dechunker_receive(conn->dechunk, mb);
559 if (err)
560 return err;
561 break;
562
563 default:
564 return EPROTO;
565 }
566
567 return 0;
568}
569
570
571static void tcp_recv_handler(struct mbuf *mb_pkt, void *arg)
572{
573 struct rtmp_conn *conn = arg;
574 int err;
575
576 conn->total_bytes += mbuf_get_left(mb_pkt);
577
578 /* re-assembly of fragments */
579 if (conn->mb) {
580 const size_t len = mbuf_get_left(mb_pkt), pos = conn->mb->pos;
581
582 if ((mbuf_get_left(conn->mb) + len) > RTMP_MESSAGE_LEN_MAX) {
583 err = EOVERFLOW;
584 goto out;
585 }
586
587 conn->mb->pos = conn->mb->end;
588
589 err = mbuf_write_mem(conn->mb,
590 mbuf_buf(mb_pkt), mbuf_get_left(mb_pkt));
591 if (err)
592 goto out;
593
594 conn->mb->pos = pos;
595 }
596 else {
597 conn->mb = mem_ref(mb_pkt);
598 }
599
600 while (mbuf_get_left(conn->mb) > 0) {
601
602 size_t pos;
603 uint32_t nrefs;
604
605 pos = conn->mb->pos;
606
607 mem_ref(conn);
608
609 if (conn->is_client)
610 err = client_handle_packet(conn, conn->mb);
611 else
612 err = server_handle_packet(conn, conn->mb);
613
614 nrefs = mem_nrefs(conn);
615
616 mem_deref(conn);
617
618 if (nrefs == 1)
619 return;
620
621 if (!conn->tc)
622 return;
623
624 if (err) {
625
626 /* rewind */
627 conn->mb->pos = pos;
628
629 if (err == ENODATA)
630 err = 0;
631 break;
632 }
633
634
635 if (conn->mb->pos >= conn->mb->end) {
636 conn->mb = mem_deref(conn->mb);
637 break;
638 }
639 }
640
641 if (err)
642 goto out;
643
644 if (conn->total_bytes >= (conn->last_ack + conn->window_ack_size)) {
645
646 conn->last_ack = conn->total_bytes;
647
648 err = rtmp_control(conn, RTMP_TYPE_ACKNOWLEDGEMENT,
649 (uint32_t)conn->total_bytes);
650 if (err)
651 goto out;
652 }
653
654 out:
655 if (err)
656 conn_close(conn, err);
657}
658
659
660static void tcp_close_handler(int err, void *arg)
661{
662 struct rtmp_conn *conn = arg;
663
664 if (conn->is_client && !conn->connected && conn->srvc > 0) {
665 err = req_connect(conn);
666 if (!err)
667 return;
668 }
669
670 conn_close(conn, err);
671}
672
673
674static int req_connect(struct rtmp_conn *conn)
675{
676 const struct sa *addr;
677 int err = EINVAL;
678
679 while (conn->srvc > 0) {
680
681 --conn->srvc;
682
683 addr = &conn->srvv[conn->srvc];
684
685 conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
686 conn->window_ack_size = WINDOW_ACK_SIZE;
687 conn->state = RTMP_STATE_UNINITIALIZED;
688 conn->last_ack = 0;
689 conn->total_bytes = 0;
690 conn->mb = mem_deref(conn->mb);
691 conn->tc = mem_deref(conn->tc);
692
693 rtmp_dechunker_set_chunksize(conn->dechunk,
694 RTMP_DEFAULT_CHUNKSIZE);
695
696 err = tcp_connect(&conn->tc, addr, tcp_estab_handler,
697 tcp_recv_handler, tcp_close_handler, conn);
698 if (!err)
699 break;
700 }
701
702 return err;
703}
704
705
706static bool rr_handler(struct dnsrr *rr, void *arg)
707{
708 struct rtmp_conn *conn = arg;
709
710 if (conn->srvc >= ARRAY_SIZE(conn->srvv))
711 return true;
712
713 switch (rr->type) {
714
715 case DNS_TYPE_A:
716 sa_set_in(&conn->srvv[conn->srvc++], rr->rdata.a.addr,
717 conn->port);
718 break;
719
720 case DNS_TYPE_AAAA:
721 sa_set_in6(&conn->srvv[conn->srvc++], rr->rdata.aaaa.addr,
722 conn->port);
723 break;
724 }
725
726 return false;
727}
728
729
730static void query_handler(int err, const struct dnshdr *hdr, struct list *ansl,
731 struct list *authl, struct list *addl, void *arg)
732{
733 struct rtmp_conn *conn = arg;
734 (void)hdr;
735 (void)authl;
736 (void)addl;
737
738 dns_rrlist_apply2(ansl, conn->host, DNS_TYPE_A, DNS_TYPE_AAAA,
739 DNS_CLASS_IN, true, rr_handler, conn);
740
741 /* wait for other (A/AAAA) query to complete */
742 if (conn->dnsq4 || conn->dnsq6)
743 return;
744
745 if (conn->srvc == 0) {
746 err = err ? err : EDESTADDRREQ;
747 goto out;
748 }
749
750 err = req_connect(conn);
751 if (err)
752 goto out;
753
754 return;
755
756 out:
757 conn_close(conn, err);
758}
759
760
761/**
762 * Connect to an RTMP server
763 *
764 * @param connp Pointer to allocated RTMP connection object
765 * @param dnsc DNS Client for resolving FQDN uris
766 * @param uri RTMP uri to connect to
767 * @param estabh Established handler
768 * @param cmdh Incoming command handler
769 * @param closeh Close handler
770 * @param arg Handler argument
771 *
772 * @return 0 if success, otherwise errorcode
773 *
774 * Example URIs:
775 *
776 * rtmp://a.rtmp.youtube.com/live2/my-stream
777 * rtmp://[::1]/vod/mp4:sample.mp4
778 */
779int rtmp_connect(struct rtmp_conn **connp, struct dnsc *dnsc, const char *uri,
780 rtmp_estab_h *estabh, rtmp_command_h *cmdh,
781 rtmp_close_h *closeh, void *arg)
782{
783 struct rtmp_conn *conn;
784 struct pl pl_hostport;
785 struct pl pl_host;
786 struct pl pl_port;
787 struct pl pl_app;
788 struct pl pl_stream;
789 int err;
790
791 if (!connp || !uri)
792 return EINVAL;
793
794 if (re_regex(uri, strlen(uri), "rtmp://[^/]+/[^/]+/[^]+",
795 &pl_hostport, &pl_app, &pl_stream))
796 return EINVAL;
797
798 if (uri_decode_hostport(&pl_hostport, &pl_host, &pl_port))
799 return EINVAL;
800
801 conn = rtmp_conn_alloc(true, estabh, cmdh, closeh, arg);
802 if (!conn)
803 return ENOMEM;
804
805 conn->port = pl_isset(&pl_port) ? pl_u32(&pl_port) : RTMP_PORT;
806
807 err = pl_strdup(&conn->app, &pl_app);
808 err |= pl_strdup(&conn->stream, &pl_stream);
809 err |= str_dup(&conn->uri, uri);
810 if (err)
811 goto out;
812
813 if (0 == sa_set(&conn->srvv[0], &pl_host, conn->port)) {
814
815 conn->srvc = 1;
816
817 err = req_connect(conn);
818 if (err)
819 goto out;
820 }
821 else {
822#ifdef HAVE_INET6
823 struct sa tmp;
824#endif
825
826 if (!dnsc) {
827 err = EINVAL;
828 goto out;
829 }
830
831 err = pl_strdup(&conn->host, &pl_host);
832 if (err)
833 goto out;
834
835 conn->dnsc = mem_ref(dnsc);
836
837 err = dnsc_query(&conn->dnsq4, dnsc, conn->host, DNS_TYPE_A,
838 DNS_CLASS_IN, true, query_handler, conn);
839 if (err)
840 goto out;
841
842#ifdef HAVE_INET6
843 if (0 == net_default_source_addr_get(AF_INET6, &tmp)) {
844
845 err = dnsc_query(&conn->dnsq6, dnsc, conn->host,
846 DNS_TYPE_AAAA, DNS_CLASS_IN,
847 true, query_handler, conn);
848 if (err)
849 goto out;
850 }
851#endif
852 }
853
854 out:
855 if (err)
856 mem_deref(conn);
857 else
858 *connp = conn;
859
860 return err;
861}
862
863
864/**
865 * Accept an incoming TCP connection creating an RTMP Server connection
866 *
867 * @param connp Pointer to allocated RTMP connection object
868 * @param ts TCP socket with pending connection
869 * @param cmdh Incoming command handler
870 * @param closeh Close handler
871 * @param arg Handler argument
872 *
873 * @return 0 if success, otherwise errorcode
874 */
875int rtmp_accept(struct rtmp_conn **connp, struct tcp_sock *ts,
876 rtmp_command_h *cmdh, rtmp_close_h *closeh, void *arg)
877{
878 struct rtmp_conn *conn;
879 int err;
880
881 if (!connp || !ts)
882 return EINVAL;
883
884 conn = rtmp_conn_alloc(false, NULL, cmdh, closeh, arg);
885 if (!conn)
886 return ENOMEM;
887
888 err = tcp_accept(&conn->tc, ts, tcp_estab_handler,
889 tcp_recv_handler, tcp_close_handler, conn);
890 if (err)
891 goto out;
892
893 out:
894 if (err)
895 mem_deref(conn);
896 else
897 *connp = conn;
898
899 return err;
900}
901
902
903int rtmp_conn_send_msg(const struct rtmp_conn *conn,
904 unsigned format, uint32_t chunk_id,
905 uint32_t timestamp, uint32_t timestamp_delta,
906 uint8_t msg_type_id, uint32_t msg_stream_id,
907 const uint8_t *payload, size_t payload_len)
908{
909 if (!conn || !payload || !payload_len)
910 return EINVAL;
911
912 return rtmp_chunker(format, chunk_id, timestamp, timestamp_delta,
913 msg_type_id, msg_stream_id, payload, payload_len,
914 conn->send_chunk_size,
915 conn->tc);
916}
917
918
919unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn)
920{
921 if (!conn)
922 return 0;
923
924 return ++conn->chunk_id_counter;
925}
926
927
928uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn)
929{
930 if (!conn)
931 return 0;
932
933 return ++conn->tid_counter;
934}
935
936
937/**
938 * Get the underlying TCP connection from an RTMP connection
939 *
940 * @param conn RTMP Connection
941 *
942 * @return TCP-Connection
943 */
944struct tcp_conn *rtmp_conn_tcpconn(const struct rtmp_conn *conn)
945{
946 return conn ? conn->tc : NULL;
947}
948
949
950/**
951 * Get the RTMP connection stream name from rtmp_connect
952 *
953 * @param conn RTMP Connection
954 *
955 * @return RTMP Stream name or NULL
956 */
957const char *rtmp_conn_stream(const struct rtmp_conn *conn)
958{
959 return conn ? conn->stream : NULL;
960}
961
962
963/**
964 * Set callback handlers for the RTMP connection
965 *
966 * @param conn RTMP connection
967 * @param cmdh Incoming command handler
968 * @param closeh Close handler
969 * @param arg Handler argument
970 */
971void rtmp_set_handlers(struct rtmp_conn *conn, rtmp_command_h *cmdh,
972 rtmp_close_h *closeh, void *arg)
973{
974 if (!conn)
975 return;
976
977 conn->cmdh = cmdh;
978 conn->closeh = closeh;
979 conn->arg = arg;
980}
981
982
983static const char *rtmp_handshake_name(enum rtmp_handshake_state state)
984{
985 switch (state) {
986
987 case RTMP_STATE_UNINITIALIZED: return "UNINITIALIZED";
988 case RTMP_STATE_VERSION_SENT: return "VERSION_SENT";
989 case RTMP_STATE_ACK_SENT: return "ACK_SENT";
990 case RTMP_STATE_HANDSHAKE_DONE: return "HANDSHAKE_DONE";
991 default: return "?";
992 }
993}
994
995
996int rtmp_conn_debug(struct re_printf *pf, const struct rtmp_conn *conn)
997{
998 int err = 0;
999
1000 if (!conn)
1001 return 0;
1002
1003 err |= re_hprintf(pf, "role: %s\n",
1004 conn->is_client ? "Client" : "Server");
1005 err |= re_hprintf(pf, "state: %s\n",
1006 rtmp_handshake_name(conn->state));
1007 err |= re_hprintf(pf, "connected: %d\n", conn->connected);
1008 err |= re_hprintf(pf, "chunk_size: send=%u\n",
1009 conn->send_chunk_size);
1010 err |= re_hprintf(pf, "bytes: %zu\n", conn->total_bytes);
1011 err |= re_hprintf(pf, "streams: %u\n",
1012 list_count(&conn->streaml));
1013
1014 if (conn->is_client) {
1015 err |= re_hprintf(pf, "uri: %s\n", conn->uri);
1016 err |= re_hprintf(pf, "app: %s\n", conn->app);
1017 err |= re_hprintf(pf, "stream: %s\n", conn->stream);
1018 }
1019
1020 err |= re_hprintf(pf, "%H\n", rtmp_dechunker_debug, conn->dechunk);
1021
1022 return err;
1023}