Squashed 'third_party/rawrtc/re/' content from commit f3163ce8b
Change-Id: I6a235e6ac0f03269d951026f9d195da05c40fdab
git-subtree-dir: third_party/rawrtc/re
git-subtree-split: f3163ce8b526a13b35ef71ce4dd6f43585064d8a
diff --git a/src/rtmp/README.md b/src/rtmp/README.md
new file mode 100644
index 0000000..5f1a6ee
--- /dev/null
+++ b/src/rtmp/README.md
@@ -0,0 +1,126 @@
+RTMP module
+-----------
+
+This module implements Real Time Messaging Protocol (RTMP) [1].
+
+
+
+
+Functional overview:
+-------------------
+
+```
+RTMP Specification v1.0 .......... YES
+RTMP with TCP transport .......... YES
+
+RTMPS (RTMP over TLS) ............ NO
+RTMPE (RTMP over Adobe Encryption) NO
+RTMPT (RTMP over HTTP) ........... NO
+RTMFP (RTMP over UDP) ............ NO
+
+Transport:
+Client ........................... YES
+Server ........................... YES
+IPv4 ............................. YES
+IPv6 ............................. YES
+DNS Resolving A/AAAA ............. YES
+
+RTMP Components:
+RTMP Handshake ................... YES
+RTMP Header encoding and decoding. YES
+RTMP Chunking .................... YES
+RTMP Dechunking .................. YES
+AMF0 (Action Message Format) ..... YES
+AMF3 (Action Message Format) ..... NO
+Send and receive audio/video ..... YES
+Regular and extended timestamp ... YES
+Multiple streams ................. YES
+```
+
+
+
+
+TODO:
+----
+
+- [x] improve AMF encoding API
+- [x] implement AMF transaction matching
+- [x] add support for Data Message
+- [x] add support for AMF Strict Array (type 10)
+- [ ] add support for TLS encryption
+- [x] add support for extended timestamp
+
+
+
+
+Protocol stack:
+--------------
+
+ .-------. .-------. .-------.
+ | AMF | | Audio | | Video |
+ '-------' '-------' '-------'
+ | | |
+ +----------+----------'
+ |
+ .-------.
+ | RTMP |
+ '-------'
+ |
+ |
+ .-------.
+ | TCP |
+ '-------'
+
+
+
+
+Message Sequence:
+----------------
+
+
+```
+Client Server
+
+|----------------- TCP Connect -------------->|
+| |
+| |
+| |
+|<-------------- 3-way Handshake ------------>|
+| |
+| |
+| |
+|----------- Command Message(connect) ------->| chunkid=3, streamid=0, tid=1
+| |
+|<------- Window Acknowledgement Size --------| chunkid=2, streamid=0
+| |
+|<----------- Set Peer Bandwidth -------------| chunkid=2, streamid=0
+| |
+|-------- Window Acknowledgement Size ------->|
+| |
+|<------ User Control Message(StreamBegin) ---| chunkid=2, streamid=0
+| |
+|<------------ Command Message ---------------| chunkid=3, streamid=0, tid=1
+| (_result- connect response) |
+```
+
+
+Interop:
+-------
+
+- Wowza Streaming Engine 4.7.1
+- Youtube service
+- FFmpeg's RTMP module
+
+
+
+
+References:
+----------
+
+[1] http://wwwimages.adobe.com/www.adobe.com/content/dam/acom/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
+
+[2] https://wwwimages2.adobe.com/content/dam/acom/en/devnet/flv/video_file_format_spec_v10_1.pdf
+
+[3] https://en.wikipedia.org/wiki/Action_Message_Format
+
+[4] https://wwwimages2.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf
diff --git a/src/rtmp/amf.c b/src/rtmp/amf.c
new file mode 100644
index 0000000..c95763e
--- /dev/null
+++ b/src/rtmp/amf.c
@@ -0,0 +1,163 @@
+/**
+ * @file rtmp/amf.c Real Time Messaging Protocol (RTMP) -- AMF Commands
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+int rtmp_command_header_encode(struct mbuf *mb, const char *name, uint64_t tid)
+{
+ int err;
+
+ if (!mb || !name)
+ return EINVAL;
+
+ err = rtmp_amf_encode_string(mb, name);
+ err |= rtmp_amf_encode_number(mb, tid);
+
+ return err;
+}
+
+
+int rtmp_amf_command(const struct rtmp_conn *conn, uint32_t stream_id,
+ const char *command, unsigned body_propc, ...)
+{
+ struct mbuf *mb;
+ va_list ap;
+ int err;
+
+ if (!conn || !command)
+ return EINVAL;
+
+ mb = mbuf_alloc(512);
+ if (!mb)
+ return ENOMEM;
+
+ err = rtmp_amf_encode_string(mb, command);
+ if (err)
+ goto out;
+
+ if (body_propc) {
+ va_start(ap, body_propc);
+ err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+ body_propc, &ap);
+ va_end(ap);
+ if (err)
+ goto out;
+ }
+
+ err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+ RTMP_TYPE_AMF0,
+ stream_id, mb->buf, mb->end);
+
+ if (err)
+ goto out;
+
+ out:
+ mem_deref(mb);
+
+ return err;
+}
+
+
+int rtmp_amf_reply(struct rtmp_conn *conn, uint32_t stream_id, bool success,
+ const struct odict *req,
+ unsigned body_propc, ...)
+{
+ struct mbuf *mb;
+ va_list ap;
+ uint64_t tid;
+ int err;
+
+ if (!conn || !req)
+ return EINVAL;
+
+ if (!odict_get_number(req, &tid, "1"))
+ return EPROTO;
+ if (tid == 0)
+ return EPROTO;
+
+ mb = mbuf_alloc(512);
+ if (!mb)
+ return ENOMEM;
+
+ err = rtmp_command_header_encode(mb,
+ success ? "_result" : "_error", tid);
+ if (err)
+ goto out;
+
+ if (body_propc) {
+ va_start(ap, body_propc);
+ err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+ body_propc, &ap);
+ va_end(ap);
+ if (err)
+ goto out;
+ }
+
+ err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+ RTMP_TYPE_AMF0,
+ stream_id, mb->buf, mb->end);
+
+ if (err)
+ goto out;
+
+ out:
+ mem_deref(mb);
+
+ return err;
+}
+
+
+int rtmp_amf_data(const struct rtmp_conn *conn, uint32_t stream_id,
+ const char *command, unsigned body_propc, ...)
+{
+ struct mbuf *mb;
+ va_list ap;
+ int err;
+
+ if (!conn || !command)
+ return EINVAL;
+
+ mb = mbuf_alloc(512);
+ if (!mb)
+ return ENOMEM;
+
+ err = rtmp_amf_encode_string(mb, command);
+ if (err)
+ goto out;
+
+ if (body_propc) {
+ va_start(ap, body_propc);
+ err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+ body_propc, &ap);
+ va_end(ap);
+ if (err)
+ goto out;
+ }
+
+ err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+ RTMP_TYPE_DATA,
+ stream_id, mb->buf, mb->end);
+
+ if (err)
+ goto out;
+
+ out:
+ mem_deref(mb);
+
+ return err;
+}
diff --git a/src/rtmp/amf_dec.c b/src/rtmp/amf_dec.c
new file mode 100644
index 0000000..de35108
--- /dev/null
+++ b/src/rtmp/amf_dec.c
@@ -0,0 +1,235 @@
+/**
+ * @file rtmp/amf_dec.c Real Time Messaging Protocol (RTMP) -- AMF Decoding
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+ AMF_HASH_SIZE = 32
+};
+
+
+static int amf_decode_value(struct odict *dict, const char *key,
+ struct mbuf *mb);
+
+
+static int amf_decode_object(struct odict *dict, struct mbuf *mb)
+{
+ char *key = NULL;
+ uint16_t len;
+ int err = 0;
+
+ while (mbuf_get_left(mb) > 0) {
+
+ if (mbuf_get_left(mb) < 2)
+ return ENODATA;
+
+ len = ntohs(mbuf_read_u16(mb));
+
+ if (len == 0) {
+ uint8_t val;
+
+ if (mbuf_get_left(mb) < 1)
+ return ENODATA;
+
+ val = mbuf_read_u8(mb);
+
+ if (val == RTMP_AMF_TYPE_OBJECT_END)
+ return 0;
+ else
+ return EBADMSG;
+ }
+
+ if (mbuf_get_left(mb) < len)
+ return ENODATA;
+
+ err = mbuf_strdup(mb, &key, len);
+ if (err)
+ return err;
+
+ err = amf_decode_value(dict, key, mb);
+
+ key = mem_deref(key);
+
+ if (err)
+ return err;
+ }
+
+ return 0;
+}
+
+
+static int amf_decode_value(struct odict *dict, const char *key,
+ struct mbuf *mb)
+{
+ union {
+ uint64_t i;
+ double f;
+ } num;
+ struct odict *object = NULL;
+ char *str = NULL;
+ uint32_t i, array_len;
+ uint8_t type;
+ uint16_t len;
+ bool boolean;
+ int err = 0;
+
+ if (mbuf_get_left(mb) < 1)
+ return ENODATA;
+
+ type = mbuf_read_u8(mb);
+
+ switch (type) {
+
+ case RTMP_AMF_TYPE_NUMBER:
+ if (mbuf_get_left(mb) < 8)
+ return ENODATA;
+
+ num.i = sys_ntohll(mbuf_read_u64(mb));
+
+ err = odict_entry_add(dict, key, ODICT_DOUBLE, num.f);
+ break;
+
+ case RTMP_AMF_TYPE_BOOLEAN:
+ if (mbuf_get_left(mb) < 1)
+ return ENODATA;
+
+ boolean = !!mbuf_read_u8(mb);
+
+ err = odict_entry_add(dict, key, ODICT_BOOL, boolean);
+ break;
+
+ case RTMP_AMF_TYPE_STRING:
+ if (mbuf_get_left(mb) < 2)
+ return ENODATA;
+
+ len = ntohs(mbuf_read_u16(mb));
+
+ if (mbuf_get_left(mb) < len)
+ return ENODATA;
+
+ err = mbuf_strdup(mb, &str, len);
+ if (err)
+ return err;
+
+ err = odict_entry_add(dict, key, ODICT_STRING, str);
+
+ mem_deref(str);
+ break;
+
+ case RTMP_AMF_TYPE_NULL:
+ err = odict_entry_add(dict, key, ODICT_NULL);
+ break;
+
+ case RTMP_AMF_TYPE_ECMA_ARRAY:
+ if (mbuf_get_left(mb) < 4)
+ return ENODATA;
+
+ array_len = ntohl(mbuf_read_u32(mb));
+
+ (void)array_len; /* ignore array length */
+
+ /* fallthrough */
+
+ case RTMP_AMF_TYPE_OBJECT:
+ err = odict_alloc(&object, 32);
+ if (err)
+ return err;
+
+ err = amf_decode_object(object, mb);
+ if (err) {
+ mem_deref(object);
+ return err;
+ }
+
+ err = odict_entry_add(dict, key, ODICT_OBJECT, object);
+
+ mem_deref(object);
+ break;
+
+ case RTMP_AMF_TYPE_STRICT_ARRAY:
+ if (mbuf_get_left(mb) < 4)
+ return ENODATA;
+
+ array_len = ntohl(mbuf_read_u32(mb));
+ if (!array_len)
+ return EPROTO;
+
+ err = odict_alloc(&object, 32);
+ if (err)
+ return err;
+
+ for (i=0; i<array_len; i++) {
+
+ char ix[32];
+
+ re_snprintf(ix, sizeof(ix), "%u", i);
+
+ err = amf_decode_value(object, ix, mb);
+ if (err) {
+ mem_deref(object);
+ return err;
+ }
+ }
+
+ err = odict_entry_add(dict, key, ODICT_ARRAY, object);
+
+ mem_deref(object);
+ break;
+
+ default:
+ err = EPROTO;
+ break;
+ }
+
+ return err;
+}
+
+
+int rtmp_amf_decode(struct odict **msgp, struct mbuf *mb)
+{
+ struct odict *msg;
+ unsigned ix = 0;
+ int err;
+
+ if (!msgp || !mb)
+ return EINVAL;
+
+ err = odict_alloc(&msg, AMF_HASH_SIZE);
+ if (err)
+ return err;
+
+ /* decode all entries on root-level */
+ while (mbuf_get_left(mb) > 0) {
+
+ char key[16];
+
+ re_snprintf(key, sizeof(key), "%u", ix++);
+
+ /* note: key is the numerical index */
+ err = amf_decode_value(msg, key, mb);
+ if (err)
+ goto out;
+ }
+
+ out:
+ if (err)
+ mem_deref(msg);
+ else
+ *msgp = msg;
+
+ return err;
+}
diff --git a/src/rtmp/amf_enc.c b/src/rtmp/amf_enc.c
new file mode 100644
index 0000000..0ba157d
--- /dev/null
+++ b/src/rtmp/amf_enc.c
@@ -0,0 +1,245 @@
+/**
+ * @file rtmp/amf_enc.c Real Time Messaging Protocol (RTMP) -- AMF Encoding
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+static int rtmp_amf_encode_key(struct mbuf *mb, const char *key)
+{
+ size_t len;
+ int err;
+
+ len = str_len(key);
+
+ if (len > 65535)
+ return EOVERFLOW;
+
+ err = mbuf_write_u16(mb, htons((uint16_t)len));
+ err |= mbuf_write_str(mb, key);
+
+ return err;
+}
+
+
+static int rtmp_amf_encode_object_start(struct mbuf *mb)
+{
+ return mbuf_write_u8(mb, RTMP_AMF_TYPE_OBJECT);
+}
+
+
+static int rtmp_amf_encode_array_start(struct mbuf *mb,
+ uint8_t type, uint32_t length)
+{
+ int err;
+
+ err = mbuf_write_u8(mb, type);
+ err |= mbuf_write_u32(mb, htonl(length));
+
+ return err;
+}
+
+
+static int rtmp_amf_encode_object_end(struct mbuf *mb)
+{
+ int err;
+
+ err = mbuf_write_u16(mb, 0);
+ err |= mbuf_write_u8(mb, RTMP_AMF_TYPE_OBJECT_END);
+
+ return err;
+}
+
+
+static bool container_has_key(enum rtmp_amf_type type)
+{
+ switch (type) {
+
+ case RTMP_AMF_TYPE_OBJECT: return true;
+ case RTMP_AMF_TYPE_ECMA_ARRAY: return true;
+ case RTMP_AMF_TYPE_STRICT_ARRAY: return false;
+ default: return false;
+ }
+}
+
+
+int rtmp_amf_encode_number(struct mbuf *mb, double val)
+{
+ const union {
+ uint64_t i;
+ double f;
+ } num = {
+ .f = val
+ };
+ int err;
+
+ if (!mb)
+ return EINVAL;
+
+ err = mbuf_write_u8(mb, RTMP_AMF_TYPE_NUMBER);
+ err |= mbuf_write_u64(mb, sys_htonll(num.i));
+
+ return err;
+}
+
+
+int rtmp_amf_encode_boolean(struct mbuf *mb, bool boolean)
+{
+ int err;
+
+ if (!mb)
+ return EINVAL;
+
+ err = mbuf_write_u8(mb, RTMP_AMF_TYPE_BOOLEAN);
+ err |= mbuf_write_u8(mb, !!boolean);
+
+ return err;
+}
+
+
+int rtmp_amf_encode_string(struct mbuf *mb, const char *str)
+{
+ size_t len;
+ int err;
+
+ if (!mb || !str)
+ return EINVAL;
+
+ len = str_len(str);
+
+ if (len > 65535)
+ return EOVERFLOW;
+
+ err = mbuf_write_u8(mb, RTMP_AMF_TYPE_STRING);
+ err |= mbuf_write_u16(mb, htons((uint16_t)len));
+ err |= mbuf_write_str(mb, str);
+
+ return err;
+}
+
+
+int rtmp_amf_encode_null(struct mbuf *mb)
+{
+ if (!mb)
+ return EINVAL;
+
+ return mbuf_write_u8(mb, RTMP_AMF_TYPE_NULL);
+}
+
+
+/*
+ * NUMBER double
+ * BOOLEAN bool
+ * STRING const char *
+ * OBJECT const char *key sub-count
+ * NULL NULL
+ * ARRAY const char *key sub-count
+ */
+int rtmp_amf_vencode_object(struct mbuf *mb, enum rtmp_amf_type container,
+ unsigned propc, va_list *ap)
+{
+ bool encode_key;
+ unsigned i;
+ int err = 0;
+
+ if (!mb || !propc || !ap)
+ return EINVAL;
+
+ encode_key = container_has_key(container);
+
+ switch (container) {
+
+ case RTMP_AMF_TYPE_OBJECT:
+ err = rtmp_amf_encode_object_start(mb);
+ break;
+
+ case RTMP_AMF_TYPE_ECMA_ARRAY:
+ case RTMP_AMF_TYPE_STRICT_ARRAY:
+ err = rtmp_amf_encode_array_start(mb, container, propc);
+ break;
+
+ case RTMP_AMF_TYPE_ROOT:
+ break;
+
+ default:
+ return ENOTSUP;
+ }
+
+ if (err)
+ return err;
+
+ for (i=0; i<propc; i++) {
+
+ int type = va_arg(*ap, int);
+ const char *str;
+ int subcount;
+ double dbl;
+ bool b;
+
+ /* add key if ARRAY or OBJECT container */
+ if (encode_key) {
+ const char *key;
+
+ key = va_arg(*ap, const char *);
+ if (!key)
+ return EINVAL;
+
+ err = rtmp_amf_encode_key(mb, key);
+ if (err)
+ return err;
+ }
+
+ switch (type) {
+
+ case RTMP_AMF_TYPE_NUMBER:
+ dbl = va_arg(*ap, double);
+ err = rtmp_amf_encode_number(mb, dbl);
+ break;
+
+ case RTMP_AMF_TYPE_BOOLEAN:
+ b = va_arg(*ap, int);
+ err = rtmp_amf_encode_boolean(mb, b);
+ break;
+
+ case RTMP_AMF_TYPE_STRING:
+ str = va_arg(*ap, const char *);
+ err = rtmp_amf_encode_string(mb, str);
+ break;
+
+ case RTMP_AMF_TYPE_NULL:
+ err = rtmp_amf_encode_null(mb);
+ break;
+
+ case RTMP_AMF_TYPE_OBJECT:
+ case RTMP_AMF_TYPE_ECMA_ARRAY:
+ case RTMP_AMF_TYPE_STRICT_ARRAY:
+ /* recursive */
+ subcount = va_arg(*ap, int);
+ err = rtmp_amf_vencode_object(mb, type, subcount, ap);
+ break;
+
+ default:
+ return ENOTSUP;
+ }
+
+ if (err)
+ return err;
+ }
+
+ if (encode_key)
+ err = rtmp_amf_encode_object_end(mb);
+
+ return err;
+}
diff --git a/src/rtmp/chunk.c b/src/rtmp/chunk.c
new file mode 100644
index 0000000..bab08bb
--- /dev/null
+++ b/src/rtmp/chunk.c
@@ -0,0 +1,87 @@
+/**
+ * @file rtmp/chunk.c Real Time Messaging Protocol (RTMP) -- Chunking
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_tcp.h>
+#include <re_list.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+/*
+ * Stateless RTMP chunker
+ */
+int rtmp_chunker(unsigned format, uint32_t chunk_id,
+ uint32_t timestamp, uint32_t timestamp_delta,
+ uint8_t msg_type_id, uint32_t msg_stream_id,
+ const uint8_t *payload, size_t payload_len,
+ size_t max_chunk_sz, struct tcp_conn *tc)
+{
+ const uint8_t *pend = payload + payload_len;
+ struct rtmp_header hdr;
+ struct mbuf *mb;
+ size_t chunk_sz;
+ int err;
+
+ if (!payload || !payload_len || !max_chunk_sz || !tc)
+ return EINVAL;
+
+ mb = mbuf_alloc(payload_len + 256);
+ if (!mb)
+ return ENOMEM;
+
+ memset(&hdr, 0, sizeof(hdr));
+
+ hdr.format = format;
+ hdr.chunk_id = chunk_id;
+
+ hdr.timestamp = timestamp;
+ hdr.timestamp_delta = timestamp_delta;
+ hdr.length = (uint32_t)payload_len;
+ hdr.type_id = msg_type_id;
+ hdr.stream_id = msg_stream_id;
+
+ chunk_sz = min(payload_len, max_chunk_sz);
+
+ err = rtmp_header_encode(mb, &hdr);
+ err |= mbuf_write_mem(mb, payload, chunk_sz);
+ if (err)
+ goto out;
+
+ payload += chunk_sz;
+
+ hdr.format = 3;
+
+ while (payload < pend) {
+
+ const size_t len = pend - payload;
+
+ chunk_sz = min(len, max_chunk_sz);
+
+ err = rtmp_header_encode(mb, &hdr);
+ err |= mbuf_write_mem(mb, payload, chunk_sz);
+ if (err)
+ goto out;
+
+ payload += chunk_sz;
+ }
+
+ mb->pos = 0;
+
+ err = tcp_send(tc, mb);
+ if (err)
+ goto out;
+
+ out:
+ mem_deref(mb);
+
+ return err;
+}
diff --git a/src/rtmp/conn.c b/src/rtmp/conn.c
new file mode 100644
index 0000000..f08a5a0
--- /dev/null
+++ b/src/rtmp/conn.c
@@ -0,0 +1,1023 @@
+/**
+ * @file rtmp/conn.c Real Time Messaging Protocol (RTMP) -- NetConnection
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_dns.h>
+#include <re_uri.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+ WINDOW_ACK_SIZE = 2500000
+};
+
+
+static int req_connect(struct rtmp_conn *conn);
+
+
+static void conn_destructor(void *data)
+{
+ struct rtmp_conn *conn = data;
+
+ list_flush(&conn->ctransl);
+ list_flush(&conn->streaml);
+
+ mem_deref(conn->dnsq6);
+ mem_deref(conn->dnsq4);
+ mem_deref(conn->dnsc);
+ mem_deref(conn->tc);
+ mem_deref(conn->mb);
+ mem_deref(conn->dechunk);
+ mem_deref(conn->uri);
+ mem_deref(conn->app);
+ mem_deref(conn->host);
+ mem_deref(conn->stream);
+}
+
+
+static int handle_amf_command(struct rtmp_conn *conn, uint32_t stream_id,
+ struct mbuf *mb)
+{
+ struct odict *msg = NULL;
+ const char *name;
+ int err;
+
+ err = rtmp_amf_decode(&msg, mb);
+ if (err)
+ return err;
+
+ name = odict_string(msg, "0");
+
+ if (conn->is_client &&
+ (0 == str_casecmp(name, "_result") ||
+ 0 == str_casecmp(name, "_error"))) {
+
+ /* forward response to transaction layer */
+ rtmp_ctrans_response(&conn->ctransl, msg);
+ }
+ else {
+ struct rtmp_stream *strm;
+
+ if (stream_id == 0) {
+ if (conn->cmdh)
+ conn->cmdh(msg, conn->arg);
+ }
+ else {
+ strm = rtmp_stream_find(conn, stream_id);
+ if (strm) {
+ if (strm->cmdh)
+ strm->cmdh(msg, strm->arg);
+ }
+ }
+ }
+
+ mem_deref(msg);
+
+ return 0;
+}
+
+
+static int handle_user_control_msg(struct rtmp_conn *conn, struct mbuf *mb)
+{
+ struct rtmp_stream *strm;
+ enum rtmp_event_type event;
+ uint32_t value;
+ int err;
+
+ if (mbuf_get_left(mb) < 6)
+ return EBADMSG;
+
+ event = ntohs(mbuf_read_u16(mb));
+ value = ntohl(mbuf_read_u32(mb));
+
+ switch (event) {
+
+ case RTMP_EVENT_STREAM_BEGIN:
+ case RTMP_EVENT_STREAM_EOF:
+ case RTMP_EVENT_STREAM_DRY:
+ case RTMP_EVENT_STREAM_IS_RECORDED:
+ case RTMP_EVENT_SET_BUFFER_LENGTH:
+
+ if (value != RTMP_CONTROL_STREAM_ID) {
+
+ strm = rtmp_stream_find(conn, value);
+ if (strm && strm->ctrlh)
+ strm->ctrlh(event, mb, strm->arg);
+ }
+ break;
+
+ case RTMP_EVENT_PING_REQUEST:
+
+ err = rtmp_control(conn, RTMP_TYPE_USER_CONTROL_MSG,
+ RTMP_EVENT_PING_RESPONSE, value);
+ if (err)
+ return err;
+ break;
+
+ default:
+ break;
+ }
+
+ return 0;
+}
+
+
+static int handle_data_message(struct rtmp_conn *conn, uint32_t stream_id,
+ struct mbuf *mb)
+{
+ struct rtmp_stream *strm;
+ struct odict *msg;
+ int err;
+
+ err = rtmp_amf_decode(&msg, mb);
+ if (err)
+ return err;
+
+ strm = rtmp_stream_find(conn, stream_id);
+ if (strm && strm->datah)
+ strm->datah(msg, strm->arg);
+
+ mem_deref(msg);
+
+ return 0;
+}
+
+
+static int rtmp_dechunk_handler(const struct rtmp_header *hdr,
+ struct mbuf *mb, void *arg)
+{
+ struct rtmp_conn *conn = arg;
+ struct rtmp_stream *strm;
+ uint32_t val;
+ uint32_t was;
+ uint8_t limit;
+ int err = 0;
+
+ switch (hdr->type_id) {
+
+ case RTMP_TYPE_SET_CHUNK_SIZE:
+ if (mbuf_get_left(mb) < 4)
+ return EBADMSG;
+
+ val = ntohl(mbuf_read_u32(mb));
+
+ val = val & 0x7fffffff;
+
+ rtmp_dechunker_set_chunksize(conn->dechunk, val);
+ break;
+
+ case RTMP_TYPE_ACKNOWLEDGEMENT:
+ if (mbuf_get_left(mb) < 4)
+ return EBADMSG;
+
+ val = ntohl(mbuf_read_u32(mb));
+ (void)val;
+ break;
+
+ case RTMP_TYPE_AMF0:
+ err = handle_amf_command(conn, hdr->stream_id, mb);
+ break;
+
+ case RTMP_TYPE_WINDOW_ACK_SIZE:
+ if (mbuf_get_left(mb) < 4)
+ return EBADMSG;
+
+ was = ntohl(mbuf_read_u32(mb));
+ if (was != 0)
+ conn->window_ack_size = was;
+ break;
+
+ case RTMP_TYPE_SET_PEER_BANDWIDTH:
+ if (mbuf_get_left(mb) < 5)
+ return EBADMSG;
+
+ was = ntohl(mbuf_read_u32(mb));
+ limit = mbuf_read_u8(mb);
+ (void)limit;
+
+ if (was != 0)
+ conn->window_ack_size = was;
+
+ err = rtmp_control(conn, RTMP_TYPE_WINDOW_ACK_SIZE,
+ (uint32_t)WINDOW_ACK_SIZE);
+ break;
+
+ case RTMP_TYPE_USER_CONTROL_MSG:
+ err = handle_user_control_msg(conn, mb);
+ break;
+
+ /* XXX: common code for audio+video */
+ case RTMP_TYPE_AUDIO:
+ strm = rtmp_stream_find(conn, hdr->stream_id);
+ if (strm) {
+ if (strm->auh) {
+ strm->auh(hdr->timestamp,
+ mb->buf, mb->end,
+ strm->arg);
+ }
+ }
+ break;
+
+ case RTMP_TYPE_VIDEO:
+ strm = rtmp_stream_find(conn, hdr->stream_id);
+ if (strm) {
+ if (strm->vidh) {
+ strm->vidh(hdr->timestamp,
+ mb->buf, mb->end,
+ strm->arg);
+ }
+ }
+ break;
+
+ case RTMP_TYPE_DATA:
+ err = handle_data_message(conn, hdr->stream_id, mb);
+ break;
+
+ default:
+ break;
+ }
+
+ return err;
+}
+
+
+static struct rtmp_conn *rtmp_conn_alloc(bool is_client,
+ rtmp_estab_h *estabh,
+ rtmp_command_h *cmdh,
+ rtmp_close_h *closeh,
+ void *arg)
+{
+ struct rtmp_conn *conn;
+ int err;
+
+ conn = mem_zalloc(sizeof(*conn), conn_destructor);
+ if (!conn)
+ return NULL;
+
+ conn->is_client = is_client;
+ conn->state = RTMP_STATE_UNINITIALIZED;
+
+ conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
+ conn->window_ack_size = WINDOW_ACK_SIZE;
+
+ err = rtmp_dechunker_alloc(&conn->dechunk, RTMP_DEFAULT_CHUNKSIZE,
+ rtmp_dechunk_handler, conn);
+ if (err)
+ goto out;
+
+ /* must be above 2 */
+ conn->chunk_id_counter = RTMP_CHUNK_ID_CONN + 1;
+
+ conn->estabh = estabh;
+ conn->cmdh = cmdh;
+ conn->closeh = closeh;
+ conn->arg = arg;
+
+ out:
+ if (err)
+ return mem_deref(conn);
+
+ return conn;
+}
+
+
+static inline void set_state(struct rtmp_conn *conn,
+ enum rtmp_handshake_state state)
+{
+ conn->state = state;
+}
+
+
+static int send_packet(struct rtmp_conn *conn, const uint8_t *pkt, size_t len)
+{
+ struct mbuf *mb;
+ int err;
+
+ if (!conn || !pkt || !len)
+ return EINVAL;
+
+ mb = mbuf_alloc(len);
+ if (!mb)
+ return ENOMEM;
+
+ (void)mbuf_write_mem(mb, pkt, len);
+
+ mb->pos = 0;
+
+ err = tcp_send(conn->tc, mb);
+ if (err)
+ goto out;
+
+ out:
+ mem_deref(mb);
+
+ return err;
+}
+
+
+static int handshake_start(struct rtmp_conn *conn)
+{
+ uint8_t sig[1+RTMP_HANDSHAKE_SIZE];
+ int err;
+
+ sig[0] = RTMP_PROTOCOL_VERSION;
+ sig[1] = 0;
+ sig[2] = 0;
+ sig[3] = 0;
+ sig[4] = 0;
+ sig[5] = VER_MAJOR;
+ sig[6] = VER_MINOR;
+ sig[7] = VER_PATCH;
+ sig[8] = 0;
+ rand_bytes(sig + 9, sizeof(sig) - 9);
+
+ err = send_packet(conn, sig, sizeof(sig));
+ if (err)
+ return err;
+
+ set_state(conn, RTMP_STATE_VERSION_SENT);
+
+ return 0;
+}
+
+
+static void conn_close(struct rtmp_conn *conn, int err)
+{
+ rtmp_close_h *closeh;
+
+ conn->tc = mem_deref(conn->tc);
+ conn->dnsq6 = mem_deref(conn->dnsq6);
+ conn->dnsq4 = mem_deref(conn->dnsq4);
+
+ closeh = conn->closeh;
+ if (closeh) {
+ conn->closeh = NULL;
+ closeh(err, conn->arg);
+ }
+}
+
+
+static void tcp_estab_handler(void *arg)
+{
+ struct rtmp_conn *conn = arg;
+ int err = 0;
+
+ if (conn->is_client) {
+
+ err = handshake_start(conn);
+ }
+
+ if (err)
+ conn_close(conn, err);
+}
+
+
+/* Send AMF0 Command or Data */
+int rtmp_send_amf_command(const struct rtmp_conn *conn,
+ unsigned format, uint32_t chunk_id,
+ uint8_t type_id,
+ uint32_t msg_stream_id,
+ const uint8_t *cmd, size_t len)
+{
+ if (!conn || !cmd || !len)
+ return EINVAL;
+
+ return rtmp_chunker(format, chunk_id, 0, 0, type_id, msg_stream_id,
+ cmd, len, conn->send_chunk_size,
+ conn->tc);
+}
+
+
+static void connect_resp_handler(bool success, const struct odict *msg,
+ void *arg)
+{
+ struct rtmp_conn *conn = arg;
+ rtmp_estab_h *estabh;
+ (void)msg;
+
+ if (!success) {
+ conn_close(conn, EPROTO);
+ return;
+ }
+
+ conn->connected = true;
+
+ estabh = conn->estabh;
+ if (estabh) {
+ conn->estabh = NULL;
+ estabh(conn->arg);
+ }
+}
+
+
+static int send_connect(struct rtmp_conn *conn)
+{
+ const int ac = 0x0400; /* AAC */
+ const int vc = 0x0080; /* H264 */
+
+ return rtmp_amf_request(conn, RTMP_CONTROL_STREAM_ID, "connect",
+ connect_resp_handler, conn,
+ 1,
+ RTMP_AMF_TYPE_OBJECT, 8,
+ RTMP_AMF_TYPE_STRING, "app", conn->app,
+ RTMP_AMF_TYPE_STRING, "flashVer", "LNX 9,0,124,2",
+ RTMP_AMF_TYPE_STRING, "tcUrl", conn->uri,
+ RTMP_AMF_TYPE_BOOLEAN, "fpad", false,
+ RTMP_AMF_TYPE_NUMBER, "capabilities", 15.0,
+ RTMP_AMF_TYPE_NUMBER, "audioCodecs", (double)ac,
+ RTMP_AMF_TYPE_NUMBER, "videoCodecs", (double)vc,
+ RTMP_AMF_TYPE_NUMBER, "videoFunction", 1.0);
+}
+
+
+static int client_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
+{
+ uint8_t s0;
+ uint8_t s1[RTMP_HANDSHAKE_SIZE];
+ int err = 0;
+
+ switch (conn->state) {
+
+ case RTMP_STATE_VERSION_SENT:
+ if (mbuf_get_left(mb) < (1+RTMP_HANDSHAKE_SIZE))
+ return ENODATA;
+
+ s0 = mbuf_read_u8(mb);
+ if (s0 != RTMP_PROTOCOL_VERSION)
+ return EPROTO;
+
+ (void)mbuf_read_mem(mb, s1, sizeof(s1));
+
+ err = send_packet(conn, s1, sizeof(s1));
+ if (err)
+ return err;
+
+ set_state(conn, RTMP_STATE_ACK_SENT);
+ break;
+
+ case RTMP_STATE_ACK_SENT:
+ if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+ return ENODATA;
+
+ /* S2 (ignored) */
+ mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
+
+ conn->send_chunk_size = 4096;
+ err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
+ conn->send_chunk_size);
+ if (err)
+ return err;
+
+ err = send_connect(conn);
+ if (err)
+ return err;
+
+ set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
+ break;
+
+ case RTMP_STATE_HANDSHAKE_DONE:
+ err = rtmp_dechunker_receive(conn->dechunk, mb);
+ if (err)
+ return err;
+ break;
+
+ default:
+ return EPROTO;
+ }
+
+ return 0;
+}
+
+
+static int server_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
+{
+ uint8_t c0;
+ uint8_t c1[RTMP_HANDSHAKE_SIZE];
+ int err = 0;
+
+ switch (conn->state) {
+
+ case RTMP_STATE_UNINITIALIZED:
+ if (mbuf_get_left(mb) < 1)
+ return ENODATA;
+
+ c0 = mbuf_read_u8(mb);
+ if (c0 != RTMP_PROTOCOL_VERSION)
+ return EPROTO;
+
+ /* Send S0 + S1 */
+ err = handshake_start(conn);
+ if (err)
+ return err;
+ break;
+
+ case RTMP_STATE_VERSION_SENT:
+ if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+ return ENODATA;
+
+ (void)mbuf_read_mem(mb, c1, sizeof(c1));
+
+ /* Copy C1 to S2 */
+ err = send_packet(conn, c1, sizeof(c1));
+ if (err)
+ return err;
+
+ set_state(conn, RTMP_STATE_ACK_SENT);
+ break;
+
+ case RTMP_STATE_ACK_SENT:
+ if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+ return ENODATA;
+
+ /* C2 (ignored) */
+ mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
+
+ conn->send_chunk_size = 4096;
+ err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
+ conn->send_chunk_size);
+ if (err)
+ return err;
+
+ set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
+ break;
+
+ case RTMP_STATE_HANDSHAKE_DONE:
+ err = rtmp_dechunker_receive(conn->dechunk, mb);
+ if (err)
+ return err;
+ break;
+
+ default:
+ return EPROTO;
+ }
+
+ return 0;
+}
+
+
+static void tcp_recv_handler(struct mbuf *mb_pkt, void *arg)
+{
+ struct rtmp_conn *conn = arg;
+ int err;
+
+ conn->total_bytes += mbuf_get_left(mb_pkt);
+
+ /* re-assembly of fragments */
+ if (conn->mb) {
+ const size_t len = mbuf_get_left(mb_pkt), pos = conn->mb->pos;
+
+ if ((mbuf_get_left(conn->mb) + len) > RTMP_MESSAGE_LEN_MAX) {
+ err = EOVERFLOW;
+ goto out;
+ }
+
+ conn->mb->pos = conn->mb->end;
+
+ err = mbuf_write_mem(conn->mb,
+ mbuf_buf(mb_pkt), mbuf_get_left(mb_pkt));
+ if (err)
+ goto out;
+
+ conn->mb->pos = pos;
+ }
+ else {
+ conn->mb = mem_ref(mb_pkt);
+ }
+
+ while (mbuf_get_left(conn->mb) > 0) {
+
+ size_t pos;
+ uint32_t nrefs;
+
+ pos = conn->mb->pos;
+
+ mem_ref(conn);
+
+ if (conn->is_client)
+ err = client_handle_packet(conn, conn->mb);
+ else
+ err = server_handle_packet(conn, conn->mb);
+
+ nrefs = mem_nrefs(conn);
+
+ mem_deref(conn);
+
+ if (nrefs == 1)
+ return;
+
+ if (!conn->tc)
+ return;
+
+ if (err) {
+
+ /* rewind */
+ conn->mb->pos = pos;
+
+ if (err == ENODATA)
+ err = 0;
+ break;
+ }
+
+
+ if (conn->mb->pos >= conn->mb->end) {
+ conn->mb = mem_deref(conn->mb);
+ break;
+ }
+ }
+
+ if (err)
+ goto out;
+
+ if (conn->total_bytes >= (conn->last_ack + conn->window_ack_size)) {
+
+ conn->last_ack = conn->total_bytes;
+
+ err = rtmp_control(conn, RTMP_TYPE_ACKNOWLEDGEMENT,
+ (uint32_t)conn->total_bytes);
+ if (err)
+ goto out;
+ }
+
+ out:
+ if (err)
+ conn_close(conn, err);
+}
+
+
+static void tcp_close_handler(int err, void *arg)
+{
+ struct rtmp_conn *conn = arg;
+
+ if (conn->is_client && !conn->connected && conn->srvc > 0) {
+ err = req_connect(conn);
+ if (!err)
+ return;
+ }
+
+ conn_close(conn, err);
+}
+
+
+static int req_connect(struct rtmp_conn *conn)
+{
+ const struct sa *addr;
+ int err = EINVAL;
+
+ while (conn->srvc > 0) {
+
+ --conn->srvc;
+
+ addr = &conn->srvv[conn->srvc];
+
+ conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
+ conn->window_ack_size = WINDOW_ACK_SIZE;
+ conn->state = RTMP_STATE_UNINITIALIZED;
+ conn->last_ack = 0;
+ conn->total_bytes = 0;
+ conn->mb = mem_deref(conn->mb);
+ conn->tc = mem_deref(conn->tc);
+
+ rtmp_dechunker_set_chunksize(conn->dechunk,
+ RTMP_DEFAULT_CHUNKSIZE);
+
+ err = tcp_connect(&conn->tc, addr, tcp_estab_handler,
+ tcp_recv_handler, tcp_close_handler, conn);
+ if (!err)
+ break;
+ }
+
+ return err;
+}
+
+
+static bool rr_handler(struct dnsrr *rr, void *arg)
+{
+ struct rtmp_conn *conn = arg;
+
+ if (conn->srvc >= ARRAY_SIZE(conn->srvv))
+ return true;
+
+ switch (rr->type) {
+
+ case DNS_TYPE_A:
+ sa_set_in(&conn->srvv[conn->srvc++], rr->rdata.a.addr,
+ conn->port);
+ break;
+
+ case DNS_TYPE_AAAA:
+ sa_set_in6(&conn->srvv[conn->srvc++], rr->rdata.aaaa.addr,
+ conn->port);
+ break;
+ }
+
+ return false;
+}
+
+
+static void query_handler(int err, const struct dnshdr *hdr, struct list *ansl,
+ struct list *authl, struct list *addl, void *arg)
+{
+ struct rtmp_conn *conn = arg;
+ (void)hdr;
+ (void)authl;
+ (void)addl;
+
+ dns_rrlist_apply2(ansl, conn->host, DNS_TYPE_A, DNS_TYPE_AAAA,
+ DNS_CLASS_IN, true, rr_handler, conn);
+
+ /* wait for other (A/AAAA) query to complete */
+ if (conn->dnsq4 || conn->dnsq6)
+ return;
+
+ if (conn->srvc == 0) {
+ err = err ? err : EDESTADDRREQ;
+ goto out;
+ }
+
+ err = req_connect(conn);
+ if (err)
+ goto out;
+
+ return;
+
+ out:
+ conn_close(conn, err);
+}
+
+
+/**
+ * Connect to an RTMP server
+ *
+ * @param connp Pointer to allocated RTMP connection object
+ * @param dnsc DNS Client for resolving FQDN uris
+ * @param uri RTMP uri to connect to
+ * @param estabh Established handler
+ * @param cmdh Incoming command handler
+ * @param closeh Close handler
+ * @param arg Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ *
+ * Example URIs:
+ *
+ * rtmp://a.rtmp.youtube.com/live2/my-stream
+ * rtmp://[::1]/vod/mp4:sample.mp4
+ */
+int rtmp_connect(struct rtmp_conn **connp, struct dnsc *dnsc, const char *uri,
+ rtmp_estab_h *estabh, rtmp_command_h *cmdh,
+ rtmp_close_h *closeh, void *arg)
+{
+ struct rtmp_conn *conn;
+ struct pl pl_hostport;
+ struct pl pl_host;
+ struct pl pl_port;
+ struct pl pl_app;
+ struct pl pl_stream;
+ int err;
+
+ if (!connp || !uri)
+ return EINVAL;
+
+ if (re_regex(uri, strlen(uri), "rtmp://[^/]+/[^/]+/[^]+",
+ &pl_hostport, &pl_app, &pl_stream))
+ return EINVAL;
+
+ if (uri_decode_hostport(&pl_hostport, &pl_host, &pl_port))
+ return EINVAL;
+
+ conn = rtmp_conn_alloc(true, estabh, cmdh, closeh, arg);
+ if (!conn)
+ return ENOMEM;
+
+ conn->port = pl_isset(&pl_port) ? pl_u32(&pl_port) : RTMP_PORT;
+
+ err = pl_strdup(&conn->app, &pl_app);
+ err |= pl_strdup(&conn->stream, &pl_stream);
+ err |= str_dup(&conn->uri, uri);
+ if (err)
+ goto out;
+
+ if (0 == sa_set(&conn->srvv[0], &pl_host, conn->port)) {
+
+ conn->srvc = 1;
+
+ err = req_connect(conn);
+ if (err)
+ goto out;
+ }
+ else {
+#ifdef HAVE_INET6
+ struct sa tmp;
+#endif
+
+ if (!dnsc) {
+ err = EINVAL;
+ goto out;
+ }
+
+ err = pl_strdup(&conn->host, &pl_host);
+ if (err)
+ goto out;
+
+ conn->dnsc = mem_ref(dnsc);
+
+ err = dnsc_query(&conn->dnsq4, dnsc, conn->host, DNS_TYPE_A,
+ DNS_CLASS_IN, true, query_handler, conn);
+ if (err)
+ goto out;
+
+#ifdef HAVE_INET6
+ if (0 == net_default_source_addr_get(AF_INET6, &tmp)) {
+
+ err = dnsc_query(&conn->dnsq6, dnsc, conn->host,
+ DNS_TYPE_AAAA, DNS_CLASS_IN,
+ true, query_handler, conn);
+ if (err)
+ goto out;
+ }
+#endif
+ }
+
+ out:
+ if (err)
+ mem_deref(conn);
+ else
+ *connp = conn;
+
+ return err;
+}
+
+
+/**
+ * Accept an incoming TCP connection creating an RTMP Server connection
+ *
+ * @param connp Pointer to allocated RTMP connection object
+ * @param ts TCP socket with pending connection
+ * @param cmdh Incoming command handler
+ * @param closeh Close handler
+ * @param arg Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_accept(struct rtmp_conn **connp, struct tcp_sock *ts,
+ rtmp_command_h *cmdh, rtmp_close_h *closeh, void *arg)
+{
+ struct rtmp_conn *conn;
+ int err;
+
+ if (!connp || !ts)
+ return EINVAL;
+
+ conn = rtmp_conn_alloc(false, NULL, cmdh, closeh, arg);
+ if (!conn)
+ return ENOMEM;
+
+ err = tcp_accept(&conn->tc, ts, tcp_estab_handler,
+ tcp_recv_handler, tcp_close_handler, conn);
+ if (err)
+ goto out;
+
+ out:
+ if (err)
+ mem_deref(conn);
+ else
+ *connp = conn;
+
+ return err;
+}
+
+
+int rtmp_conn_send_msg(const struct rtmp_conn *conn,
+ unsigned format, uint32_t chunk_id,
+ uint32_t timestamp, uint32_t timestamp_delta,
+ uint8_t msg_type_id, uint32_t msg_stream_id,
+ const uint8_t *payload, size_t payload_len)
+{
+ if (!conn || !payload || !payload_len)
+ return EINVAL;
+
+ return rtmp_chunker(format, chunk_id, timestamp, timestamp_delta,
+ msg_type_id, msg_stream_id, payload, payload_len,
+ conn->send_chunk_size,
+ conn->tc);
+}
+
+
+unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn)
+{
+ if (!conn)
+ return 0;
+
+ return ++conn->chunk_id_counter;
+}
+
+
+uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn)
+{
+ if (!conn)
+ return 0;
+
+ return ++conn->tid_counter;
+}
+
+
+/**
+ * Get the underlying TCP connection from an RTMP connection
+ *
+ * @param conn RTMP Connection
+ *
+ * @return TCP-Connection
+ */
+struct tcp_conn *rtmp_conn_tcpconn(const struct rtmp_conn *conn)
+{
+ return conn ? conn->tc : NULL;
+}
+
+
+/**
+ * Get the RTMP connection stream name from rtmp_connect
+ *
+ * @param conn RTMP Connection
+ *
+ * @return RTMP Stream name or NULL
+ */
+const char *rtmp_conn_stream(const struct rtmp_conn *conn)
+{
+ return conn ? conn->stream : NULL;
+}
+
+
+/**
+ * Set callback handlers for the RTMP connection
+ *
+ * @param conn RTMP connection
+ * @param cmdh Incoming command handler
+ * @param closeh Close handler
+ * @param arg Handler argument
+ */
+void rtmp_set_handlers(struct rtmp_conn *conn, rtmp_command_h *cmdh,
+ rtmp_close_h *closeh, void *arg)
+{
+ if (!conn)
+ return;
+
+ conn->cmdh = cmdh;
+ conn->closeh = closeh;
+ conn->arg = arg;
+}
+
+
+static const char *rtmp_handshake_name(enum rtmp_handshake_state state)
+{
+ switch (state) {
+
+ case RTMP_STATE_UNINITIALIZED: return "UNINITIALIZED";
+ case RTMP_STATE_VERSION_SENT: return "VERSION_SENT";
+ case RTMP_STATE_ACK_SENT: return "ACK_SENT";
+ case RTMP_STATE_HANDSHAKE_DONE: return "HANDSHAKE_DONE";
+ default: return "?";
+ }
+}
+
+
+int rtmp_conn_debug(struct re_printf *pf, const struct rtmp_conn *conn)
+{
+ int err = 0;
+
+ if (!conn)
+ return 0;
+
+ err |= re_hprintf(pf, "role: %s\n",
+ conn->is_client ? "Client" : "Server");
+ err |= re_hprintf(pf, "state: %s\n",
+ rtmp_handshake_name(conn->state));
+ err |= re_hprintf(pf, "connected: %d\n", conn->connected);
+ err |= re_hprintf(pf, "chunk_size: send=%u\n",
+ conn->send_chunk_size);
+ err |= re_hprintf(pf, "bytes: %zu\n", conn->total_bytes);
+ err |= re_hprintf(pf, "streams: %u\n",
+ list_count(&conn->streaml));
+
+ if (conn->is_client) {
+ err |= re_hprintf(pf, "uri: %s\n", conn->uri);
+ err |= re_hprintf(pf, "app: %s\n", conn->app);
+ err |= re_hprintf(pf, "stream: %s\n", conn->stream);
+ }
+
+ err |= re_hprintf(pf, "%H\n", rtmp_dechunker_debug, conn->dechunk);
+
+ return err;
+}
diff --git a/src/rtmp/control.c b/src/rtmp/control.c
new file mode 100644
index 0000000..f479a13
--- /dev/null
+++ b/src/rtmp/control.c
@@ -0,0 +1,106 @@
+/**
+ * @file rtmp/control.c Real Time Messaging Protocol (RTMP) -- Control
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+/**
+ * Send an RTMP control message
+ *
+ * @param conn RTMP connection
+ * @param type RTMP Packet type
+ * @param ... Optional packet arguments
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_control(const struct rtmp_conn *conn, enum rtmp_packet_type type, ...)
+{
+ struct mbuf *mb;
+ uint32_t u32;
+ uint16_t event;
+ va_list ap;
+ int err = 0;
+
+ if (!conn)
+ return EINVAL;
+
+ mb = mbuf_alloc(8);
+ if (!mb)
+ return ENOMEM;
+
+ va_start(ap, type);
+
+ switch (type) {
+
+ case RTMP_TYPE_SET_CHUNK_SIZE:
+ case RTMP_TYPE_WINDOW_ACK_SIZE:
+ case RTMP_TYPE_ACKNOWLEDGEMENT:
+ u32 = va_arg(ap, uint32_t);
+ err = mbuf_write_u32(mb, htonl(u32));
+ break;
+
+ case RTMP_TYPE_USER_CONTROL_MSG:
+ event = va_arg(ap, unsigned);
+ err = mbuf_write_u16(mb, htons(event));
+ err |= mbuf_write_u32(mb, htonl(va_arg(ap, uint32_t)));
+ break;
+
+ case RTMP_TYPE_SET_PEER_BANDWIDTH:
+ err = mbuf_write_u32(mb, htonl(va_arg(ap, uint32_t)));
+ err |= mbuf_write_u8(mb, va_arg(ap, unsigned));
+ break;
+
+ default:
+ err = ENOTSUP;
+ break;
+ }
+
+ va_end(ap);
+
+ if (err)
+ goto out;
+
+ err = rtmp_conn_send_msg(conn, 0, RTMP_CHUNK_ID_CONTROL, 0, 0, type,
+ RTMP_CONTROL_STREAM_ID, mb->buf, mb->end);
+ if (err)
+ goto out;
+
+ out:
+ mem_deref(mb);
+
+ return err;
+}
+
+
+/**
+ * Get the event name as a string
+ *
+ * @param event RTMP Event type
+ *
+ * @return Name of the event as a string
+ */
+const char *rtmp_event_name(enum rtmp_event_type event)
+{
+ switch (event) {
+
+ case RTMP_EVENT_STREAM_BEGIN: return "StreamBegin";
+ case RTMP_EVENT_STREAM_EOF: return "StreamEOF";
+ case RTMP_EVENT_STREAM_DRY: return "StreamDry";
+ case RTMP_EVENT_SET_BUFFER_LENGTH: return "SetBufferLength";
+ case RTMP_EVENT_STREAM_IS_RECORDED: return "StreamIsRecorded";
+ case RTMP_EVENT_PING_REQUEST: return "PingRequest";
+ case RTMP_EVENT_PING_RESPONSE: return "PingResponse";
+ default: return "?";
+ }
+}
diff --git a/src/rtmp/ctrans.c b/src/rtmp/ctrans.c
new file mode 100644
index 0000000..6e56f84
--- /dev/null
+++ b/src/rtmp/ctrans.c
@@ -0,0 +1,138 @@
+/**
+ * @file rtmp/ctrans.c Real Time Messaging Protocol -- AMF Client Transactions
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+struct rtmp_ctrans {
+ struct le le;
+ uint64_t tid;
+ rtmp_resp_h *resph;
+ void *arg;
+};
+
+
+static void ctrans_destructor(void *data)
+{
+ struct rtmp_ctrans *ct = data;
+
+ list_unlink(&ct->le);
+}
+
+
+static struct rtmp_ctrans *rtmp_ctrans_find(const struct list *ctransl,
+ uint64_t tid)
+{
+ struct le *le;
+
+ for (le = list_head(ctransl); le; le = le->next) {
+ struct rtmp_ctrans *ct = le->data;
+
+ if (tid == ct->tid)
+ return ct;
+ }
+
+ return NULL;
+}
+
+
+int rtmp_amf_request(struct rtmp_conn *conn, uint32_t stream_id,
+ const char *command,
+ rtmp_resp_h *resph, void *arg, unsigned body_propc, ...)
+{
+ struct rtmp_ctrans *ct = NULL;
+ struct mbuf *mb;
+ va_list ap;
+ int err;
+
+ if (!conn || !command || !resph)
+ return EINVAL;
+
+ mb = mbuf_alloc(512);
+ if (!mb)
+ return ENOMEM;
+
+ ct = mem_zalloc(sizeof(*ct), ctrans_destructor);
+ if (!ct) {
+ err = ENOMEM;
+ goto out;
+ }
+
+ ct->tid = rtmp_conn_assign_tid(conn);
+ ct->resph = resph;
+ ct->arg = arg;
+
+ err = rtmp_command_header_encode(mb, command, ct->tid);
+ if (err)
+ goto out;
+
+ if (body_propc) {
+ va_start(ap, body_propc);
+ err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+ body_propc, &ap);
+ va_end(ap);
+ if (err)
+ goto out;
+ }
+
+ err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+ RTMP_TYPE_AMF0,
+ stream_id, mb->buf, mb->end);
+ if (err)
+ goto out;
+
+ list_append(&conn->ctransl, &ct->le, ct);
+
+ out:
+ mem_deref(mb);
+ if (err)
+ mem_deref(ct);
+
+ return err;
+}
+
+
+int rtmp_ctrans_response(const struct list *ctransl,
+ const struct odict *msg)
+{
+ struct rtmp_ctrans *ct;
+ uint64_t tid;
+ bool success;
+ rtmp_resp_h *resph;
+ void *arg;
+
+ if (!ctransl || !msg)
+ return EINVAL;
+
+ success = (0 == str_casecmp(odict_string(msg, "0"), "_result"));
+
+ if (!odict_get_number(msg, &tid, "1"))
+ return EPROTO;
+
+ ct = rtmp_ctrans_find(ctransl, tid);
+ if (!ct)
+ return ENOENT;
+
+ resph = ct->resph;
+ arg = ct->arg;
+
+ mem_deref(ct);
+
+ resph(success, msg, arg);
+
+ return 0;
+}
diff --git a/src/rtmp/dechunk.c b/src/rtmp/dechunk.c
new file mode 100644
index 0000000..df82d78
--- /dev/null
+++ b/src/rtmp/dechunk.c
@@ -0,0 +1,292 @@
+/**
+ * @file rtmp/dechunk.c Real Time Messaging Protocol (RTMP) -- Dechunking
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_list.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+ MAX_CHUNKS = 64,
+};
+
+
+struct rtmp_chunk {
+ struct le le;
+ struct rtmp_header hdr;
+ struct mbuf *mb;
+};
+
+/** Defines the RTMP Dechunker */
+struct rtmp_dechunker {
+ struct list chunkl; /* struct rtmp_chunk */
+ size_t chunk_sz;
+ rtmp_dechunk_h *chunkh;
+ void *arg;
+};
+
+
+static void destructor(void *data)
+{
+ struct rtmp_dechunker *rd = data;
+
+ list_flush(&rd->chunkl);
+}
+
+
+static void chunk_destructor(void *data)
+{
+ struct rtmp_chunk *chunk = data;
+
+ list_unlink(&chunk->le);
+ mem_deref(chunk->mb);
+}
+
+
+static struct rtmp_chunk *create_chunk(struct list *chunkl,
+ const struct rtmp_header *hdr)
+{
+ struct rtmp_chunk *chunk;
+
+ chunk = mem_zalloc(sizeof(*chunk), chunk_destructor);
+ if (!chunk)
+ return NULL;
+
+ chunk->hdr = *hdr;
+
+ list_append(chunkl, &chunk->le, chunk);
+
+ return chunk;
+}
+
+
+static struct rtmp_chunk *find_chunk(const struct list *chunkl,
+ uint32_t chunk_id)
+{
+ struct le *le;
+
+ for (le = list_head(chunkl); le; le = le->next) {
+
+ struct rtmp_chunk *chunk = le->data;
+
+ if (chunk_id == chunk->hdr.chunk_id)
+ return chunk;
+ }
+
+ return NULL;
+}
+
+
+/*
+ * Stateful RTMP de-chunker for receiving complete messages
+ */
+int rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
+ rtmp_dechunk_h *chunkh, void *arg)
+{
+ struct rtmp_dechunker *rd;
+
+ if (!rdp || !chunk_sz || !chunkh)
+ return EINVAL;
+
+ rd = mem_zalloc(sizeof(*rd), destructor);
+ if (!rd)
+ return ENOMEM;
+
+ rd->chunk_sz = chunk_sz;
+
+ rd->chunkh = chunkh;
+ rd->arg = arg;
+
+ *rdp = rd;
+
+ return 0;
+}
+
+
+int rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb)
+{
+ struct rtmp_header hdr;
+ struct rtmp_chunk *chunk;
+ size_t chunk_sz, left, msg_len;
+ int err;
+
+ if (!rd || !mb)
+ return EINVAL;
+
+ err = rtmp_header_decode(&hdr, mb);
+ if (err)
+ return err;
+
+ /* find preceding chunk, from chunk id */
+ chunk = find_chunk(&rd->chunkl, hdr.chunk_id);
+ if (!chunk) {
+
+ /* only type 0 can create a new chunk stream */
+ if (hdr.format == 0) {
+ if (list_count(&rd->chunkl) > MAX_CHUNKS)
+ return EOVERFLOW;
+
+ chunk = create_chunk(&rd->chunkl, &hdr);
+ if (!chunk)
+ return ENOMEM;
+ }
+ else
+ return ENOENT;
+ }
+
+ switch (hdr.format) {
+
+ case 0:
+ case 1:
+ case 2:
+ if (hdr.format == 0) {
+
+ /* copy the whole header */
+ chunk->hdr = hdr;
+ }
+ else if (hdr.format == 1) {
+
+ chunk->hdr.timestamp_delta = hdr.timestamp_delta;
+ chunk->hdr.length = hdr.length;
+ chunk->hdr.type_id = hdr.type_id;
+ }
+ else if (hdr.format == 2) {
+
+ chunk->hdr.timestamp_delta = hdr.timestamp_delta;
+ }
+
+ msg_len = chunk->hdr.length;
+
+ chunk_sz = min(msg_len, rd->chunk_sz);
+
+ if (mbuf_get_left(mb) < chunk_sz)
+ return ENODATA;
+
+ mem_deref(chunk->mb);
+ chunk->mb = mbuf_alloc(msg_len);
+ if (!chunk->mb)
+ return ENOMEM;
+
+ err = mbuf_read_mem(mb, chunk->mb->buf, chunk_sz);
+ if (err)
+ return err;
+
+ chunk->mb->pos = chunk_sz;
+ chunk->mb->end = chunk_sz;
+
+ chunk->hdr.format = hdr.format;
+ chunk->hdr.ext_ts = hdr.ext_ts;
+
+ if (hdr.format == 1 || hdr.format == 2)
+ chunk->hdr.timestamp += hdr.timestamp_delta;
+ break;
+
+ case 3:
+ if (chunk->hdr.ext_ts) {
+
+ uint32_t ext_ts;
+
+ if (mbuf_get_left(mb) < 4)
+ return ENODATA;
+
+ ext_ts = ntohl(mbuf_read_u32(mb));
+
+ if (chunk->hdr.format == 0)
+ chunk->hdr.timestamp = ext_ts;
+ else
+ chunk->hdr.timestamp_delta = ext_ts;
+ }
+
+ if (!chunk->mb) {
+
+ chunk->mb = mbuf_alloc(chunk->hdr.length);
+ if (!chunk->mb)
+ return ENOMEM;
+
+ if (chunk->hdr.format == 0) {
+ chunk->hdr.timestamp_delta =
+ chunk->hdr.timestamp;
+ }
+
+ chunk->hdr.timestamp += chunk->hdr.timestamp_delta;
+ }
+
+ left = mbuf_get_space(chunk->mb);
+
+ chunk_sz = min(left, rd->chunk_sz);
+
+ if (mbuf_get_left(mb) < chunk_sz)
+ return ENODATA;
+
+ err = mbuf_read_mem(mb, mbuf_buf(chunk->mb), chunk_sz);
+ if (err)
+ return err;
+
+ chunk->mb->pos += chunk_sz;
+ chunk->mb->end += chunk_sz;
+ break;
+
+ default:
+ return EPROTO;
+ }
+
+ if (chunk->mb->pos >= chunk->mb->size) {
+
+ struct mbuf *buf;
+
+ chunk->mb->pos = 0;
+
+ buf = chunk->mb;
+ chunk->mb = NULL;
+
+ err = rd->chunkh(&chunk->hdr, buf, rd->arg);
+
+ mem_deref(buf);
+ }
+
+ return err;
+}
+
+
+void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz)
+{
+ if (!rd || !chunk_sz)
+ return;
+
+ rd->chunk_sz = chunk_sz;
+}
+
+
+int rtmp_dechunker_debug(struct re_printf *pf, const struct rtmp_dechunker *rd)
+{
+ struct le *le;
+ int err;
+
+ if (!rd)
+ return 0;
+
+ err = re_hprintf(pf, "Dechunker Debug:\n");
+
+ err |= re_hprintf(pf, "chunk list: (%u)\n", list_count(&rd->chunkl));
+
+ for (le = rd->chunkl.head; le; le = le->next) {
+
+ const struct rtmp_chunk *msg = le->data;
+
+ err |= re_hprintf(pf, ".. %H\n",
+ rtmp_header_print, &msg->hdr);
+ }
+
+ err |= re_hprintf(pf, "\n");
+
+ return err;
+}
diff --git a/src/rtmp/hdr.c b/src/rtmp/hdr.c
new file mode 100644
index 0000000..38bd370
--- /dev/null
+++ b/src/rtmp/hdr.c
@@ -0,0 +1,279 @@
+/**
+ * @file rtmp/hdr.c Real Time Messaging Protocol (RTMP) -- Headers
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_sys.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+ RTMP_CHUNK_ID_MIN = 3,
+ RTMP_CHUNK_ID_MAX = 65599, /* 65535 + 64 */
+
+ RTMP_CHUNK_OFFSET = 64,
+ TIMESTAMP_24MAX = 0x00ffffff,
+};
+
+
+static int mbuf_write_u24_hton(struct mbuf *mb, uint32_t u24)
+{
+ int err = 0;
+
+ err |= mbuf_write_u8(mb, u24 >> 16);
+ err |= mbuf_write_u8(mb, u24 >> 8);
+ err |= mbuf_write_u8(mb, u24 >> 0);
+
+ return err;
+}
+
+
+static uint32_t mbuf_read_u24_ntoh(struct mbuf *mb)
+{
+ uint32_t u24;
+
+ u24 = (uint32_t)mbuf_read_u8(mb) << 16;
+ u24 |= (uint32_t)mbuf_read_u8(mb) << 8;
+ u24 |= (uint32_t)mbuf_read_u8(mb) << 0;
+
+ return u24;
+}
+
+
+static int encode_basic_hdr(struct mbuf *mb, unsigned fmt,
+ uint32_t chunk_id)
+{
+ uint8_t v;
+ int err = 0;
+
+ if (chunk_id >= 320) {
+
+ const uint16_t cs_id = chunk_id - RTMP_CHUNK_OFFSET;
+
+ v = fmt<<6 | 1;
+
+ err |= mbuf_write_u8(mb, v);
+ err |= mbuf_write_u16(mb, htons(cs_id));
+ }
+ else if (chunk_id >= RTMP_CHUNK_OFFSET) {
+
+ const uint8_t cs_id = chunk_id - RTMP_CHUNK_OFFSET;
+
+ v = fmt<<6 | 0;
+
+ err |= mbuf_write_u8(mb, v);
+ err |= mbuf_write_u8(mb, cs_id);
+ }
+ else {
+ v = fmt<<6 | chunk_id;
+
+ err |= mbuf_write_u8(mb, v);
+ }
+
+ return err;
+}
+
+
+static int decode_basic_hdr(struct rtmp_header *hdr, struct mbuf *mb)
+{
+ uint8_t cs_id;
+ uint8_t v;
+
+ if (mbuf_get_left(mb) < 1)
+ return ENODATA;
+
+ v = mbuf_read_u8(mb);
+
+ hdr->format = v>>6;
+
+ cs_id = v & 0x3f;
+
+ switch (cs_id) {
+
+ case 0:
+ if (mbuf_get_left(mb) < 1)
+ return ENODATA;
+
+ hdr->chunk_id = mbuf_read_u8(mb) + RTMP_CHUNK_OFFSET;
+ break;
+
+ case 1:
+ if (mbuf_get_left(mb) < 2)
+ return ENODATA;
+
+ hdr->chunk_id = ntohs(mbuf_read_u16(mb)) + RTMP_CHUNK_OFFSET;
+ break;
+
+ default:
+ hdr->chunk_id = cs_id;
+ break;
+ }
+
+ return 0;
+}
+
+
+static uint32_t ts_24(uint32_t ts)
+{
+ return ts >= TIMESTAMP_24MAX ? TIMESTAMP_24MAX : ts;
+}
+
+
+static uint32_t ts_ext(uint32_t ts)
+{
+ return ts >= TIMESTAMP_24MAX ? ts : 0;
+}
+
+
+int rtmp_header_encode(struct mbuf *mb, struct rtmp_header *hdr)
+{
+ int err = 0;
+
+ if (!mb || !hdr)
+ return EINVAL;
+
+ err = encode_basic_hdr(mb, hdr->format, hdr->chunk_id);
+ if (err)
+ return err;
+
+ switch (hdr->format) {
+
+ case 0:
+ hdr->timestamp_ext = ts_ext(hdr->timestamp);
+
+ err |= mbuf_write_u24_hton(mb, ts_24(hdr->timestamp));
+ err |= mbuf_write_u24_hton(mb, hdr->length);
+ err |= mbuf_write_u8(mb, hdr->type_id);
+ err |= mbuf_write_u32(mb, sys_htoll(hdr->stream_id));
+ break;
+
+ case 1:
+ hdr->timestamp_ext = ts_ext(hdr->timestamp_delta);
+
+ err |= mbuf_write_u24_hton(mb, ts_24(hdr->timestamp_delta));
+ err |= mbuf_write_u24_hton(mb, hdr->length);
+ err |= mbuf_write_u8(mb, hdr->type_id);
+ break;
+
+ case 2:
+ hdr->timestamp_ext = ts_ext(hdr->timestamp_delta);
+
+ err |= mbuf_write_u24_hton(mb, ts_24(hdr->timestamp_delta));
+ break;
+
+ case 3:
+ break;
+ }
+
+ if (hdr->timestamp_ext) {
+ err |= mbuf_write_u32(mb, htonl(hdr->timestamp_ext));
+ }
+
+ return err;
+}
+
+
+int rtmp_header_decode(struct rtmp_header *hdr, struct mbuf *mb)
+{
+ uint32_t *timestamp_ext = NULL;
+ int err;
+
+ if (!hdr || !mb)
+ return EINVAL;
+
+ memset(hdr, 0, sizeof(*hdr));
+
+ err = decode_basic_hdr(hdr, mb);
+ if (err)
+ return err;
+
+ switch (hdr->format) {
+
+ case 0:
+ if (mbuf_get_left(mb) < 11)
+ return ENODATA;
+
+ hdr->timestamp = mbuf_read_u24_ntoh(mb);
+ hdr->length = mbuf_read_u24_ntoh(mb);
+ hdr->type_id = mbuf_read_u8(mb);
+ hdr->stream_id = sys_ltohl(mbuf_read_u32(mb));
+ break;
+
+ case 1:
+ if (mbuf_get_left(mb) < 7)
+ return ENODATA;
+
+ hdr->timestamp_delta = mbuf_read_u24_ntoh(mb);
+ hdr->length = mbuf_read_u24_ntoh(mb);
+ hdr->type_id = mbuf_read_u8(mb);
+ break;
+
+ case 2:
+ if (mbuf_get_left(mb) < 3)
+ return ENODATA;
+
+ hdr->timestamp_delta = mbuf_read_u24_ntoh(mb);
+ break;
+
+ case 3:
+ /* no payload */
+ break;
+ }
+
+ if (hdr->timestamp == TIMESTAMP_24MAX)
+ timestamp_ext = &hdr->timestamp;
+ else if (hdr->timestamp_delta == TIMESTAMP_24MAX)
+ timestamp_ext = &hdr->timestamp_delta;
+
+ if (timestamp_ext) {
+ if (mbuf_get_left(mb) < 4)
+ return ENODATA;
+
+ *timestamp_ext = ntohl(mbuf_read_u32(mb));
+ hdr->ext_ts = true;
+ }
+
+ return 0;
+}
+
+
+int rtmp_header_print(struct re_printf *pf, const struct rtmp_header *hdr)
+{
+ if (!hdr)
+ return 0;
+
+ return re_hprintf(pf,
+ "fmt %u, chunk %u, "
+ "timestamp %5u, ts_delta %2u,"
+ " len %3u, type %2u (%-14s) stream_id %u",
+ hdr->format, hdr->chunk_id, hdr->timestamp,
+ hdr->timestamp_delta, hdr->length, hdr->type_id,
+ rtmp_packet_type_name(hdr->type_id), hdr->stream_id);
+}
+
+
+const char *rtmp_packet_type_name(enum rtmp_packet_type type)
+{
+ switch (type) {
+
+ case RTMP_TYPE_SET_CHUNK_SIZE: return "Set Chunk Size";
+ case RTMP_TYPE_ACKNOWLEDGEMENT: return "Acknowledgement";
+ case RTMP_TYPE_USER_CONTROL_MSG: return "User Control Message";
+ case RTMP_TYPE_WINDOW_ACK_SIZE: return "Window Acknowledgement Size";
+ case RTMP_TYPE_SET_PEER_BANDWIDTH:return "Set Peer Bandwidth";
+ case RTMP_TYPE_AUDIO: return "Audio Message";
+ case RTMP_TYPE_VIDEO: return "Video Message";
+ case RTMP_TYPE_DATA: return "Data Message";
+ case RTMP_TYPE_AMF0: return "AMF";
+ default: return "?";
+ }
+}
diff --git a/src/rtmp/mod.mk b/src/rtmp/mod.mk
new file mode 100644
index 0000000..c1b73df
--- /dev/null
+++ b/src/rtmp/mod.mk
@@ -0,0 +1,16 @@
+#
+# mod.mk
+#
+# Copyright (C) 2010 Creytiv.com
+#
+
+SRCS += rtmp/amf.c
+SRCS += rtmp/amf_dec.c
+SRCS += rtmp/amf_enc.c
+SRCS += rtmp/chunk.c
+SRCS += rtmp/conn.c
+SRCS += rtmp/control.c
+SRCS += rtmp/ctrans.c
+SRCS += rtmp/dechunk.c
+SRCS += rtmp/hdr.c
+SRCS += rtmp/stream.c
diff --git a/src/rtmp/rtmp.h b/src/rtmp/rtmp.h
new file mode 100644
index 0000000..90085e0
--- /dev/null
+++ b/src/rtmp/rtmp.h
@@ -0,0 +1,178 @@
+/**
+ * @file rtmp.h Real Time Messaging Protocol (RTMP) -- Internal API
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+
+
+enum {
+ RTMP_PROTOCOL_VERSION = 3,
+ RTMP_DEFAULT_CHUNKSIZE = 128,
+ RTMP_HANDSHAKE_SIZE = 1536,
+ RTMP_MESSAGE_LEN_MAX = 524288,
+};
+
+/* Chunk IDs */
+enum {
+ RTMP_CHUNK_ID_CONTROL = 2,
+ RTMP_CHUNK_ID_CONN = 3,
+};
+
+/** Defines the RTMP Handshake State */
+enum rtmp_handshake_state {
+ RTMP_STATE_UNINITIALIZED = 0,
+ RTMP_STATE_VERSION_SENT,
+ RTMP_STATE_ACK_SENT,
+ RTMP_STATE_HANDSHAKE_DONE
+};
+
+/**
+ * Defines an RTMP Connection
+ */
+struct rtmp_conn {
+ struct list streaml;
+ struct rtmp_dechunker *dechunk;
+ struct tcp_conn *tc;
+ struct mbuf *mb; /* TCP reassembly buffer */
+ enum rtmp_handshake_state state;
+ size_t total_bytes;
+ size_t last_ack;
+ uint32_t window_ack_size;
+ uint32_t send_chunk_size;
+ unsigned chunk_id_counter;
+ bool is_client;
+ bool connected;
+ rtmp_estab_h *estabh;
+ rtmp_command_h *cmdh;
+ rtmp_close_h *closeh;
+ void *arg;
+
+ /* client specific: */
+ struct dnsc *dnsc;
+ struct dns_query *dnsq4;
+ struct dns_query *dnsq6;
+ struct list ctransl;
+ struct sa srvv[16];
+ unsigned srvc;
+ uint64_t tid_counter;
+ uint16_t port;
+ char *app;
+ char *uri;
+ char *stream;
+ char *host;
+};
+
+/**
+ * Defines an RTMP Stream
+ */
+struct rtmp_stream {
+ struct le le;
+ const struct rtmp_conn *conn; /**< Pointer to parent connection */
+ bool created;
+ uint32_t stream_id;
+ unsigned chunk_id_audio;
+ unsigned chunk_id_video;
+ unsigned chunk_id_data;
+ rtmp_audio_h *auh;
+ rtmp_video_h *vidh;
+ rtmp_command_h *datah;
+ rtmp_command_h *cmdh;
+ rtmp_resp_h *resph;
+ rtmp_control_h *ctrlh;
+ void *arg;
+};
+
+struct rtmp_header {
+ unsigned format:2; /* type 0-3 */
+ uint32_t chunk_id; /* from 3-65599 */
+
+ uint32_t timestamp; /* 24-bit or 32-bit */
+ uint32_t timestamp_delta; /* 24-bit */
+ uint32_t timestamp_ext;
+ uint32_t length; /* 24-bit */
+ uint8_t type_id; /* enum rtmp_packet_type */
+ uint32_t stream_id;
+ bool ext_ts;
+};
+
+
+/* Command */
+
+int rtmp_command_header_encode(struct mbuf *mb, const char *name,
+ uint64_t tid);
+
+/* Connection */
+
+int rtmp_conn_send_msg(const struct rtmp_conn *conn, unsigned format,
+ uint32_t chunk_id, uint32_t timestamp,
+ uint32_t timestamp_delta, uint8_t msg_type_id,
+ uint32_t msg_stream_id,
+ const uint8_t *payload, size_t payload_len);
+int rtmp_send_amf_command(const struct rtmp_conn *conn,
+ unsigned format, uint32_t chunk_id,
+ uint8_t type_id,
+ uint32_t msg_stream_id,
+ const uint8_t *cmd, size_t len);
+unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn);
+uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn);
+
+
+/* Client Transaction */
+
+
+struct rtmp_ctrans;
+
+int rtmp_ctrans_response(const struct list *ctransl,
+ const struct odict *msg);
+
+
+/*
+ * RTMP Chunk
+ */
+
+int rtmp_chunker(unsigned format, uint32_t chunk_id,
+ uint32_t timestamp, uint32_t timestamp_delta,
+ uint8_t msg_type_id, uint32_t msg_stream_id,
+ const uint8_t *payload, size_t payload_len,
+ size_t max_chunk_sz, struct tcp_conn *tc);
+
+
+/*
+ * RTMP Header
+ */
+
+int rtmp_header_encode(struct mbuf *mb, struct rtmp_header *hdr);
+int rtmp_header_decode(struct rtmp_header *hdr, struct mbuf *mb);
+int rtmp_header_print(struct re_printf *pf, const struct rtmp_header *hdr);
+const char *rtmp_packet_type_name(enum rtmp_packet_type type);
+
+
+/*
+ * RTMP De-chunker
+ */
+
+struct rtmp_dechunker;
+
+typedef int (rtmp_dechunk_h)(const struct rtmp_header *hdr,
+ struct mbuf *mb, void *arg);
+
+int rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
+ rtmp_dechunk_h *chunkh, void *arg);
+int rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb);
+void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz);
+int rtmp_dechunker_debug(struct re_printf *pf,
+ const struct rtmp_dechunker *rd);
+
+
+/*
+ * AMF (Action Message Format)
+ */
+
+int rtmp_amf_encode_number(struct mbuf *mb, double val);
+int rtmp_amf_encode_boolean(struct mbuf *mb, bool boolean);
+int rtmp_amf_encode_string(struct mbuf *mb, const char *str);
+int rtmp_amf_encode_null(struct mbuf *mb);
+int rtmp_amf_vencode_object(struct mbuf *mb, enum rtmp_amf_type container,
+ unsigned propc, va_list *ap);
+
+int rtmp_amf_decode(struct odict **msgp, struct mbuf *mb);
diff --git a/src/rtmp/stream.c b/src/rtmp/stream.c
new file mode 100644
index 0000000..fe8748d
--- /dev/null
+++ b/src/rtmp/stream.c
@@ -0,0 +1,309 @@
+/**
+ * @file rtmp/stream.c Real Time Messaging Protocol (RTMP) -- NetStream
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+static void destructor(void *data)
+{
+ struct rtmp_stream *strm = data;
+
+ list_unlink(&strm->le);
+
+ if (strm->created) {
+
+ rtmp_amf_command(strm->conn, 0, "deleteStream",
+ 3,
+ RTMP_AMF_TYPE_NUMBER, 0.0,
+ RTMP_AMF_TYPE_NULL,
+ RTMP_AMF_TYPE_NUMBER, (double)strm->stream_id);
+ }
+}
+
+
+/**
+ * Allocate a new RTMP Stream object
+ *
+ * @param strmp Pointer to allocated RTMP Stream
+ * @param conn RTMP Connection
+ * @param stream_id Stream id
+ * @param cmdh Command handler
+ * @param ctrlh Control handler
+ * @param auh Audio handler
+ * @param vidh Video handler
+ * @param datah Data handler
+ * @param arg Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_stream_alloc(struct rtmp_stream **strmp, struct rtmp_conn *conn,
+ uint32_t stream_id, rtmp_command_h *cmdh,
+ rtmp_control_h *ctrlh, rtmp_audio_h *auh,
+ rtmp_video_h *vidh, rtmp_command_h *datah,
+ void *arg)
+{
+ struct rtmp_stream *strm;
+
+ if (!strmp || !conn)
+ return EINVAL;
+
+ strm = mem_zalloc(sizeof(*strm), destructor);
+ if (!strm)
+ return ENOMEM;
+
+ strm->conn = conn;
+ strm->stream_id = stream_id;
+
+ strm->cmdh = cmdh;
+ strm->ctrlh = ctrlh;
+ strm->auh = auh;
+ strm->vidh = vidh;
+ strm->datah = datah;
+ strm->arg = arg;
+
+ strm->chunk_id_audio = rtmp_conn_assign_chunkid(conn);
+ strm->chunk_id_video = rtmp_conn_assign_chunkid(conn);
+ strm->chunk_id_data = rtmp_conn_assign_chunkid(conn);
+
+ list_append(&conn->streaml, &strm->le, strm);
+
+ *strmp = strm;
+
+ return 0;
+}
+
+
+static void createstream_handler(bool success, const struct odict *msg,
+ void *arg)
+{
+ struct rtmp_stream *strm = arg;
+ uint64_t num;
+
+ if (!success)
+ goto out;
+
+ if (!odict_get_number(msg, &num, "3")) {
+ success = false;
+ goto out;
+ }
+
+ strm->stream_id = (uint32_t)num;
+ if (strm->stream_id == 0) {
+ success = false;
+ goto out;
+ }
+
+ strm->created = true;
+
+ out:
+ if (strm->resph)
+ strm->resph(success, msg, strm->arg);
+}
+
+
+/**
+ * Create a new RTMP Stream by sending "createStream" to the RTMP Server.
+ *
+ * @param strmp Pointer to allocated RTMP Stream
+ * @param conn RTMP Connection
+ * @param resph RTMP Response handler
+ * @param cmdh Command handler
+ * @param ctrlh Control handler
+ * @param auh Audio handler
+ * @param vidh Video handler
+ * @param datah Data handler
+ * @param arg Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_stream_create(struct rtmp_stream **strmp, struct rtmp_conn *conn,
+ rtmp_resp_h *resph, rtmp_command_h *cmdh,
+ rtmp_control_h *ctrlh, rtmp_audio_h *auh,
+ rtmp_video_h *vidh, rtmp_command_h *datah,
+ void *arg)
+{
+ struct rtmp_stream *strm;
+ int err;
+
+ if (!strmp || !conn)
+ return EINVAL;
+
+ err = rtmp_stream_alloc(&strm, conn, (uint32_t)-1,
+ cmdh, ctrlh, auh, vidh, datah, arg);
+ if (err)
+ return err;
+
+ strm->resph = resph;
+
+ err = rtmp_amf_request(conn, 0,
+ "createStream", createstream_handler, strm,
+ 1,
+ RTMP_AMF_TYPE_NULL);
+ if (err)
+ goto out;
+
+ out:
+ if (err)
+ mem_deref(strm);
+ else
+ *strmp = strm;
+
+ return err;
+}
+
+
+/**
+ * Start playing an RTMP Stream by sending "play" to the RTMP Server
+ *
+ * @param strm RTMP Stream
+ * @param name Stream name
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_play(struct rtmp_stream *strm, const char *name)
+{
+ if (!strm || !name)
+ return EINVAL;
+
+ return rtmp_amf_command(strm->conn, strm->stream_id, "play",
+ 4,
+ RTMP_AMF_TYPE_NUMBER, 0.0,
+ RTMP_AMF_TYPE_NULL,
+ RTMP_AMF_TYPE_STRING, name,
+ RTMP_AMF_TYPE_NUMBER, -2000.0);
+}
+
+
+/**
+ * Start publishing an RTMP Stream by sending "publish" to the RTMP Server
+ *
+ * @param strm RTMP Stream
+ * @param name Stream name
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_publish(struct rtmp_stream *strm, const char *name)
+{
+ if (!strm || !name)
+ return EINVAL;
+
+ return rtmp_amf_command(strm->conn, strm->stream_id, "publish",
+ 4,
+ RTMP_AMF_TYPE_NUMBER, 0.0,
+ RTMP_AMF_TYPE_NULL,
+ RTMP_AMF_TYPE_STRING, name,
+ RTMP_AMF_TYPE_STRING, "live");
+}
+
+
+/**
+ * Send metadata on the stream to the RTMP Server
+ *
+ * @param strm RTMP Stream
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_meta(struct rtmp_stream *strm)
+{
+ if (!strm)
+ return EINVAL;
+
+ return rtmp_amf_data(strm->conn, strm->stream_id, "@setDataFrame",
+ 2,
+ RTMP_AMF_TYPE_STRING, "onMetaData",
+ RTMP_AMF_TYPE_ECMA_ARRAY, 2,
+ RTMP_AMF_TYPE_NUMBER, "audiocodecid", 10.0,
+ RTMP_AMF_TYPE_NUMBER, "videocodecid", 7.0);
+}
+
+
+/**
+ * Send audio packet on the RTMP Stream
+ *
+ * @param strm RTMP Stream
+ * @param timestamp Timestamp in [milliseconds]
+ * @param pld Audio payload
+ * @param len Payload length
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_send_audio(struct rtmp_stream *strm, uint32_t timestamp,
+ const uint8_t *pld, size_t len)
+{
+ uint32_t chunk_id;
+
+ if (!strm || !pld || !len)
+ return EINVAL;
+
+ chunk_id = strm->chunk_id_audio;
+
+ return rtmp_conn_send_msg(strm->conn, 0, chunk_id, timestamp, 0,
+ RTMP_TYPE_AUDIO, strm->stream_id, pld, len);
+}
+
+
+/**
+ * Send video packet on the RTMP Stream
+ *
+ * @param strm RTMP Stream
+ * @param timestamp Timestamp in [milliseconds]
+ * @param pld Video payload
+ * @param len Payload length
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_send_video(struct rtmp_stream *strm, uint32_t timestamp,
+ const uint8_t *pld, size_t len)
+{
+ uint32_t chunk_id;
+
+ if (!strm || !pld || !len)
+ return EINVAL;
+
+ chunk_id = strm->chunk_id_video;
+
+ return rtmp_conn_send_msg(strm->conn, 0, chunk_id, timestamp, 0,
+ RTMP_TYPE_VIDEO, strm->stream_id, pld, len);
+}
+
+
+/**
+ * Find an RTMP Stream by stream id
+ *
+ * @param conn RTMP Connection
+ * @param stream_id Stream id
+ *
+ * @return RTMP Stream if found, or NULL if not found
+ */
+struct rtmp_stream *rtmp_stream_find(const struct rtmp_conn *conn,
+ uint32_t stream_id)
+{
+ struct le *le;
+
+ if (!conn)
+ return NULL;
+
+ for (le = list_head(&conn->streaml); le; le = le->next) {
+
+ struct rtmp_stream *strm = le->data;
+
+ if (stream_id == strm->stream_id)
+ return strm;
+ }
+
+ return NULL;
+}