Squashed 'third_party/rawrtc/re/' content from commit f3163ce8b

Change-Id: I6a235e6ac0f03269d951026f9d195da05c40fdab
git-subtree-dir: third_party/rawrtc/re
git-subtree-split: f3163ce8b526a13b35ef71ce4dd6f43585064d8a
diff --git a/src/rtmp/README.md b/src/rtmp/README.md
new file mode 100644
index 0000000..5f1a6ee
--- /dev/null
+++ b/src/rtmp/README.md
@@ -0,0 +1,126 @@
+RTMP module
+-----------
+
+This module implements Real Time Messaging Protocol (RTMP) [1].
+
+
+
+
+Functional overview:
+-------------------
+
+```
+RTMP Specification v1.0 .......... YES
+RTMP with TCP transport .......... YES
+
+RTMPS (RTMP over TLS) ............ NO
+RTMPE (RTMP over Adobe Encryption) NO
+RTMPT (RTMP over HTTP) ........... NO
+RTMFP (RTMP over UDP) ............ NO
+
+Transport:
+Client ........................... YES
+Server ........................... YES
+IPv4 ............................. YES
+IPv6 ............................. YES
+DNS Resolving A/AAAA ............. YES
+
+RTMP Components:
+RTMP Handshake ................... YES
+RTMP Header encoding and decoding. YES
+RTMP Chunking .................... YES
+RTMP Dechunking .................. YES
+AMF0 (Action Message Format) ..... YES
+AMF3 (Action Message Format) ..... NO
+Send and receive audio/video ..... YES
+Regular and extended timestamp ... YES
+Multiple streams ................. YES
+```
+
+
+
+
+TODO:
+----
+
+- [x] improve AMF encoding API
+- [x] implement AMF transaction matching
+- [x] add support for Data Message
+- [x] add support for AMF Strict Array (type 10)
+- [ ] add support for TLS encryption
+- [x] add support for extended timestamp
+
+
+
+
+Protocol stack:
+--------------
+
+    .-------.  .-------.  .-------.
+    |  AMF  |  | Audio |  | Video |
+    '-------'  '-------'  '-------'
+        |          |          |
+        +----------+----------'
+                   |
+               .-------.
+               |  RTMP |
+               '-------'
+                   |
+                   |
+               .-------.
+               |  TCP  |
+               '-------'
+
+
+
+
+Message Sequence:
+----------------
+
+
+```
+Client                                      Server
+
+|----------------- TCP Connect -------------->|
+|                                             |
+|                                             |
+|                                             |
+|<-------------- 3-way Handshake ------------>|
+|                                             |
+|                                             |
+|                                             |
+|----------- Command Message(connect) ------->| chunkid=3, streamid=0, tid=1
+|                                             |
+|<------- Window Acknowledgement Size --------| chunkid=2, streamid=0
+|                                             |
+|<----------- Set Peer Bandwidth -------------| chunkid=2, streamid=0
+|                                             |
+|-------- Window Acknowledgement Size ------->|
+|                                             |
+|<------ User Control Message(StreamBegin) ---| chunkid=2, streamid=0
+|                                             |
+|<------------ Command Message ---------------| chunkid=3, streamid=0, tid=1
+|        (_result- connect response)          |
+```
+
+
+Interop:
+-------
+
+- Wowza Streaming Engine 4.7.1
+- Youtube service
+- FFmpeg's RTMP module
+
+
+
+
+References:
+----------
+
+[1] http://wwwimages.adobe.com/www.adobe.com/content/dam/acom/en/devnet/rtmp/pdf/rtmp_specification_1.0.pdf
+
+[2] https://wwwimages2.adobe.com/content/dam/acom/en/devnet/flv/video_file_format_spec_v10_1.pdf
+
+[3] https://en.wikipedia.org/wiki/Action_Message_Format
+
+[4] https://wwwimages2.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf
diff --git a/src/rtmp/amf.c b/src/rtmp/amf.c
new file mode 100644
index 0000000..c95763e
--- /dev/null
+++ b/src/rtmp/amf.c
@@ -0,0 +1,163 @@
+/**
+ * @file rtmp/amf.c  Real Time Messaging Protocol (RTMP) -- AMF Commands
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+int rtmp_command_header_encode(struct mbuf *mb, const char *name, uint64_t tid)
+{
+	int err;
+
+	if (!mb || !name)
+		return EINVAL;
+
+	err  = rtmp_amf_encode_string(mb, name);
+	err |= rtmp_amf_encode_number(mb, tid);
+
+	return err;
+}
+
+
+int rtmp_amf_command(const struct rtmp_conn *conn, uint32_t stream_id,
+		     const char *command, unsigned body_propc, ...)
+{
+	struct mbuf *mb;
+	va_list ap;
+	int err;
+
+	if (!conn || !command)
+		return EINVAL;
+
+	mb = mbuf_alloc(512);
+	if (!mb)
+		return ENOMEM;
+
+	err = rtmp_amf_encode_string(mb, command);
+	if (err)
+		goto out;
+
+	if (body_propc) {
+		va_start(ap, body_propc);
+		err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+					      body_propc, &ap);
+		va_end(ap);
+		if (err)
+			goto out;
+	}
+
+	err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+				    RTMP_TYPE_AMF0,
+				    stream_id, mb->buf, mb->end);
+
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
+
+
+int rtmp_amf_reply(struct rtmp_conn *conn, uint32_t stream_id, bool success,
+		   const struct odict *req,
+		   unsigned body_propc, ...)
+{
+	struct mbuf *mb;
+	va_list ap;
+	uint64_t tid;
+	int err;
+
+	if (!conn || !req)
+		return EINVAL;
+
+	if (!odict_get_number(req, &tid, "1"))
+		return EPROTO;
+	if (tid == 0)
+		return EPROTO;
+
+	mb = mbuf_alloc(512);
+	if (!mb)
+		return ENOMEM;
+
+	err = rtmp_command_header_encode(mb,
+					 success ? "_result" : "_error", tid);
+	if (err)
+		goto out;
+
+	if (body_propc) {
+		va_start(ap, body_propc);
+		err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+					      body_propc, &ap);
+		va_end(ap);
+		if (err)
+			goto out;
+	}
+
+	err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+				    RTMP_TYPE_AMF0,
+				    stream_id, mb->buf, mb->end);
+
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
+
+
+int rtmp_amf_data(const struct rtmp_conn *conn, uint32_t stream_id,
+		  const char *command, unsigned body_propc, ...)
+{
+	struct mbuf *mb;
+	va_list ap;
+	int err;
+
+	if (!conn || !command)
+		return EINVAL;
+
+	mb = mbuf_alloc(512);
+	if (!mb)
+		return ENOMEM;
+
+	err = rtmp_amf_encode_string(mb, command);
+	if (err)
+		goto out;
+
+	if (body_propc) {
+		va_start(ap, body_propc);
+		err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+					      body_propc, &ap);
+		va_end(ap);
+		if (err)
+			goto out;
+	}
+
+	err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+				    RTMP_TYPE_DATA,
+				    stream_id, mb->buf, mb->end);
+
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
diff --git a/src/rtmp/amf_dec.c b/src/rtmp/amf_dec.c
new file mode 100644
index 0000000..de35108
--- /dev/null
+++ b/src/rtmp/amf_dec.c
@@ -0,0 +1,235 @@
+/**
+ * @file rtmp/amf_dec.c  Real Time Messaging Protocol (RTMP) -- AMF Decoding
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+	AMF_HASH_SIZE = 32
+};
+
+
+static int amf_decode_value(struct odict *dict, const char *key,
+			    struct mbuf *mb);
+
+
+static int amf_decode_object(struct odict *dict, struct mbuf *mb)
+{
+	char *key = NULL;
+	uint16_t len;
+	int err = 0;
+
+	while (mbuf_get_left(mb) > 0) {
+
+		if (mbuf_get_left(mb) < 2)
+			return ENODATA;
+
+		len = ntohs(mbuf_read_u16(mb));
+
+		if (len == 0) {
+			uint8_t val;
+
+			if (mbuf_get_left(mb) < 1)
+				return ENODATA;
+
+			val = mbuf_read_u8(mb);
+
+			if (val == RTMP_AMF_TYPE_OBJECT_END)
+				return 0;
+			else
+				return EBADMSG;
+		}
+
+		if (mbuf_get_left(mb) < len)
+			return ENODATA;
+
+		err = mbuf_strdup(mb, &key, len);
+		if (err)
+			return err;
+
+		err = amf_decode_value(dict, key, mb);
+
+		key = mem_deref(key);
+
+		if (err)
+			return err;
+	}
+
+	return 0;
+}
+
+
+static int amf_decode_value(struct odict *dict, const char *key,
+			    struct mbuf *mb)
+{
+	union {
+		uint64_t i;
+		double f;
+	} num;
+	struct odict *object = NULL;
+	char *str = NULL;
+	uint32_t i, array_len;
+	uint8_t type;
+	uint16_t len;
+	bool boolean;
+	int err = 0;
+
+	if (mbuf_get_left(mb) < 1)
+		return ENODATA;
+
+	type = mbuf_read_u8(mb);
+
+	switch (type) {
+
+	case RTMP_AMF_TYPE_NUMBER:
+		if (mbuf_get_left(mb) < 8)
+			return ENODATA;
+
+		num.i = sys_ntohll(mbuf_read_u64(mb));
+
+		err = odict_entry_add(dict, key, ODICT_DOUBLE, num.f);
+		break;
+
+	case RTMP_AMF_TYPE_BOOLEAN:
+		if (mbuf_get_left(mb) < 1)
+			return ENODATA;
+
+		boolean = !!mbuf_read_u8(mb);
+
+		err = odict_entry_add(dict, key, ODICT_BOOL, boolean);
+		break;
+
+	case RTMP_AMF_TYPE_STRING:
+		if (mbuf_get_left(mb) < 2)
+			return ENODATA;
+
+		len = ntohs(mbuf_read_u16(mb));
+
+		if (mbuf_get_left(mb) < len)
+			return ENODATA;
+
+		err = mbuf_strdup(mb, &str, len);
+		if (err)
+			return err;
+
+		err = odict_entry_add(dict, key, ODICT_STRING, str);
+
+		mem_deref(str);
+		break;
+
+	case RTMP_AMF_TYPE_NULL:
+		err = odict_entry_add(dict, key, ODICT_NULL);
+		break;
+
+	case RTMP_AMF_TYPE_ECMA_ARRAY:
+		if (mbuf_get_left(mb) < 4)
+			return ENODATA;
+
+		array_len = ntohl(mbuf_read_u32(mb));
+
+		(void)array_len;  /* ignore array length */
+
+		/* fallthrough */
+
+	case RTMP_AMF_TYPE_OBJECT:
+		err = odict_alloc(&object, 32);
+		if (err)
+			return err;
+
+		err = amf_decode_object(object, mb);
+		if (err) {
+			mem_deref(object);
+			return err;
+		}
+
+		err = odict_entry_add(dict, key, ODICT_OBJECT, object);
+
+		mem_deref(object);
+		break;
+
+	case RTMP_AMF_TYPE_STRICT_ARRAY:
+		if (mbuf_get_left(mb) < 4)
+			return ENODATA;
+
+		array_len = ntohl(mbuf_read_u32(mb));
+		if (!array_len)
+			return EPROTO;
+
+		err = odict_alloc(&object, 32);
+		if (err)
+			return err;
+
+		for (i=0; i<array_len; i++) {
+
+			char ix[32];
+
+			re_snprintf(ix, sizeof(ix), "%u", i);
+
+			err = amf_decode_value(object, ix, mb);
+			if (err) {
+				mem_deref(object);
+				return err;
+			}
+		}
+
+		err = odict_entry_add(dict, key, ODICT_ARRAY, object);
+
+		mem_deref(object);
+		break;
+
+	default:
+		err = EPROTO;
+		break;
+	}
+
+	return err;
+}
+
+
+int rtmp_amf_decode(struct odict **msgp, struct mbuf *mb)
+{
+	struct odict *msg;
+	unsigned ix = 0;
+	int err;
+
+	if (!msgp || !mb)
+		return EINVAL;
+
+	err = odict_alloc(&msg, AMF_HASH_SIZE);
+	if (err)
+		return err;
+
+	/* decode all entries on root-level */
+	while (mbuf_get_left(mb) > 0) {
+
+		char key[16];
+
+		re_snprintf(key, sizeof(key), "%u", ix++);
+
+		/* note: key is the numerical index */
+		err = amf_decode_value(msg, key, mb);
+		if (err)
+			goto out;
+	}
+
+ out:
+	if (err)
+		mem_deref(msg);
+	else
+		*msgp = msg;
+
+	return err;
+}
diff --git a/src/rtmp/amf_enc.c b/src/rtmp/amf_enc.c
new file mode 100644
index 0000000..0ba157d
--- /dev/null
+++ b/src/rtmp/amf_enc.c
@@ -0,0 +1,245 @@
+/**
+ * @file rtmp/amf_enc.c  Real Time Messaging Protocol (RTMP) -- AMF Encoding
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+static int rtmp_amf_encode_key(struct mbuf *mb, const char *key)
+{
+	size_t len;
+	int err;
+
+	len = str_len(key);
+
+	if (len > 65535)
+		return EOVERFLOW;
+
+	err  = mbuf_write_u16(mb, htons((uint16_t)len));
+	err |= mbuf_write_str(mb, key);
+
+	return err;
+}
+
+
+static int rtmp_amf_encode_object_start(struct mbuf *mb)
+{
+	return mbuf_write_u8(mb, RTMP_AMF_TYPE_OBJECT);
+}
+
+
+static int rtmp_amf_encode_array_start(struct mbuf *mb,
+				       uint8_t type, uint32_t length)
+{
+	int err;
+
+	err  = mbuf_write_u8(mb, type);
+	err |= mbuf_write_u32(mb, htonl(length));
+
+	return err;
+}
+
+
+static int rtmp_amf_encode_object_end(struct mbuf *mb)
+{
+	int err;
+
+	err  = mbuf_write_u16(mb, 0);
+	err |= mbuf_write_u8(mb, RTMP_AMF_TYPE_OBJECT_END);
+
+	return err;
+}
+
+
+static bool container_has_key(enum rtmp_amf_type type)
+{
+	switch (type) {
+
+	case RTMP_AMF_TYPE_OBJECT:       return true;
+	case RTMP_AMF_TYPE_ECMA_ARRAY:   return true;
+	case RTMP_AMF_TYPE_STRICT_ARRAY: return false;
+	default:                         return false;
+	}
+}
+
+
+int rtmp_amf_encode_number(struct mbuf *mb, double val)
+{
+	const union {
+		uint64_t i;
+		double f;
+	} num = {
+		.f = val
+	};
+	int err;
+
+	if (!mb)
+		return EINVAL;
+
+	err  = mbuf_write_u8(mb, RTMP_AMF_TYPE_NUMBER);
+	err |= mbuf_write_u64(mb, sys_htonll(num.i));
+
+	return err;
+}
+
+
+int rtmp_amf_encode_boolean(struct mbuf *mb, bool boolean)
+{
+	int err;
+
+	if (!mb)
+		return EINVAL;
+
+	err  = mbuf_write_u8(mb, RTMP_AMF_TYPE_BOOLEAN);
+	err |= mbuf_write_u8(mb, !!boolean);
+
+	return err;
+}
+
+
+int rtmp_amf_encode_string(struct mbuf *mb, const char *str)
+{
+	size_t len;
+	int err;
+
+	if (!mb || !str)
+		return EINVAL;
+
+	len = str_len(str);
+
+	if (len > 65535)
+		return EOVERFLOW;
+
+	err  = mbuf_write_u8(mb, RTMP_AMF_TYPE_STRING);
+	err |= mbuf_write_u16(mb, htons((uint16_t)len));
+	err |= mbuf_write_str(mb, str);
+
+	return err;
+}
+
+
+int rtmp_amf_encode_null(struct mbuf *mb)
+{
+	if (!mb)
+		return EINVAL;
+
+	return mbuf_write_u8(mb, RTMP_AMF_TYPE_NULL);
+}
+
+
+/*
+ * NUMBER    double
+ * BOOLEAN   bool
+ * STRING    const char *
+ * OBJECT    const char *key    sub-count
+ * NULL      NULL
+ * ARRAY     const char *key    sub-count
+ */
+int rtmp_amf_vencode_object(struct mbuf *mb, enum rtmp_amf_type container,
+			    unsigned propc, va_list *ap)
+{
+	bool encode_key;
+	unsigned i;
+	int err = 0;
+
+	if (!mb || !propc || !ap)
+		return EINVAL;
+
+	encode_key = container_has_key(container);
+
+	switch (container) {
+
+	case RTMP_AMF_TYPE_OBJECT:
+		err = rtmp_amf_encode_object_start(mb);
+		break;
+
+	case RTMP_AMF_TYPE_ECMA_ARRAY:
+	case RTMP_AMF_TYPE_STRICT_ARRAY:
+		err = rtmp_amf_encode_array_start(mb, container, propc);
+		break;
+
+	case RTMP_AMF_TYPE_ROOT:
+		break;
+
+	default:
+		return ENOTSUP;
+	}
+
+	if (err)
+		return err;
+
+	for (i=0; i<propc; i++) {
+
+		int type = va_arg(*ap, int);
+		const char *str;
+		int subcount;
+		double dbl;
+		bool b;
+
+		/* add key if ARRAY or OBJECT container */
+		if (encode_key) {
+			const char *key;
+
+			key = va_arg(*ap, const char *);
+			if (!key)
+				return EINVAL;
+
+			err = rtmp_amf_encode_key(mb, key);
+			if (err)
+				return err;
+		}
+
+		switch (type) {
+
+		case RTMP_AMF_TYPE_NUMBER:
+			dbl = va_arg(*ap, double);
+			err = rtmp_amf_encode_number(mb, dbl);
+			break;
+
+		case RTMP_AMF_TYPE_BOOLEAN:
+			b = va_arg(*ap, int);
+			err = rtmp_amf_encode_boolean(mb, b);
+			break;
+
+		case RTMP_AMF_TYPE_STRING:
+			str = va_arg(*ap, const char *);
+			err = rtmp_amf_encode_string(mb, str);
+			break;
+
+		case RTMP_AMF_TYPE_NULL:
+			err = rtmp_amf_encode_null(mb);
+			break;
+
+		case RTMP_AMF_TYPE_OBJECT:
+		case RTMP_AMF_TYPE_ECMA_ARRAY:
+		case RTMP_AMF_TYPE_STRICT_ARRAY:
+			/* recursive */
+			subcount = va_arg(*ap, int);
+			err = rtmp_amf_vencode_object(mb, type, subcount, ap);
+			break;
+
+		default:
+			return ENOTSUP;
+		}
+
+		if (err)
+			return err;
+	}
+
+	if (encode_key)
+		err = rtmp_amf_encode_object_end(mb);
+
+	return err;
+}
diff --git a/src/rtmp/chunk.c b/src/rtmp/chunk.c
new file mode 100644
index 0000000..bab08bb
--- /dev/null
+++ b/src/rtmp/chunk.c
@@ -0,0 +1,87 @@
+/**
+ * @file rtmp/chunk.c  Real Time Messaging Protocol (RTMP) -- Chunking
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_tcp.h>
+#include <re_list.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+/*
+ * Stateless RTMP chunker
+ */
+int rtmp_chunker(unsigned format, uint32_t chunk_id,
+		 uint32_t timestamp, uint32_t timestamp_delta,
+		 uint8_t msg_type_id, uint32_t msg_stream_id,
+		 const uint8_t *payload, size_t payload_len,
+		 size_t max_chunk_sz, struct tcp_conn *tc)
+{
+	const uint8_t *pend = payload + payload_len;
+	struct rtmp_header hdr;
+	struct mbuf *mb;
+	size_t chunk_sz;
+	int err;
+
+	if (!payload || !payload_len || !max_chunk_sz || !tc)
+		return EINVAL;
+
+	mb = mbuf_alloc(payload_len + 256);
+	if (!mb)
+		return ENOMEM;
+
+	memset(&hdr, 0, sizeof(hdr));
+
+	hdr.format = format;
+	hdr.chunk_id = chunk_id;
+
+	hdr.timestamp       = timestamp;
+	hdr.timestamp_delta = timestamp_delta;
+	hdr.length          = (uint32_t)payload_len;
+	hdr.type_id         = msg_type_id;
+	hdr.stream_id       = msg_stream_id;
+
+	chunk_sz = min(payload_len, max_chunk_sz);
+
+	err  = rtmp_header_encode(mb, &hdr);
+	err |= mbuf_write_mem(mb, payload, chunk_sz);
+	if (err)
+		goto out;
+
+	payload += chunk_sz;
+
+	hdr.format = 3;
+
+	while (payload < pend) {
+
+		const size_t len = pend - payload;
+
+		chunk_sz = min(len, max_chunk_sz);
+
+		err  = rtmp_header_encode(mb, &hdr);
+		err |= mbuf_write_mem(mb, payload, chunk_sz);
+		if (err)
+			goto out;
+
+		payload += chunk_sz;
+	}
+
+	mb->pos = 0;
+
+	err = tcp_send(tc, mb);
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
diff --git a/src/rtmp/conn.c b/src/rtmp/conn.c
new file mode 100644
index 0000000..f08a5a0
--- /dev/null
+++ b/src/rtmp/conn.c
@@ -0,0 +1,1023 @@
+/**
+ * @file rtmp/conn.c  Real Time Messaging Protocol (RTMP) -- NetConnection
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_dns.h>
+#include <re_uri.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+	WINDOW_ACK_SIZE = 2500000
+};
+
+
+static int req_connect(struct rtmp_conn *conn);
+
+
+static void conn_destructor(void *data)
+{
+	struct rtmp_conn *conn = data;
+
+	list_flush(&conn->ctransl);
+	list_flush(&conn->streaml);
+
+	mem_deref(conn->dnsq6);
+	mem_deref(conn->dnsq4);
+	mem_deref(conn->dnsc);
+	mem_deref(conn->tc);
+	mem_deref(conn->mb);
+	mem_deref(conn->dechunk);
+	mem_deref(conn->uri);
+	mem_deref(conn->app);
+	mem_deref(conn->host);
+	mem_deref(conn->stream);
+}
+
+
+static int handle_amf_command(struct rtmp_conn *conn, uint32_t stream_id,
+			      struct mbuf *mb)
+{
+	struct odict *msg = NULL;
+	const char *name;
+	int err;
+
+	err = rtmp_amf_decode(&msg, mb);
+	if (err)
+		return err;
+
+	name = odict_string(msg, "0");
+
+	if (conn->is_client &&
+	    (0 == str_casecmp(name, "_result") ||
+	     0 == str_casecmp(name, "_error"))) {
+
+		/* forward response to transaction layer */
+		rtmp_ctrans_response(&conn->ctransl, msg);
+	}
+	else {
+		struct rtmp_stream *strm;
+
+		if (stream_id == 0) {
+			if (conn->cmdh)
+				conn->cmdh(msg, conn->arg);
+		}
+		else {
+			strm = rtmp_stream_find(conn, stream_id);
+			if (strm) {
+				if (strm->cmdh)
+					strm->cmdh(msg, strm->arg);
+			}
+		}
+	}
+
+	mem_deref(msg);
+
+	return 0;
+}
+
+
+static int handle_user_control_msg(struct rtmp_conn *conn, struct mbuf *mb)
+{
+	struct rtmp_stream *strm;
+	enum rtmp_event_type event;
+	uint32_t value;
+	int err;
+
+	if (mbuf_get_left(mb) < 6)
+		return EBADMSG;
+
+	event = ntohs(mbuf_read_u16(mb));
+	value = ntohl(mbuf_read_u32(mb));
+
+	switch (event) {
+
+	case RTMP_EVENT_STREAM_BEGIN:
+	case RTMP_EVENT_STREAM_EOF:
+	case RTMP_EVENT_STREAM_DRY:
+	case RTMP_EVENT_STREAM_IS_RECORDED:
+	case RTMP_EVENT_SET_BUFFER_LENGTH:
+
+		if (value != RTMP_CONTROL_STREAM_ID) {
+
+			strm = rtmp_stream_find(conn, value);
+			if (strm && strm->ctrlh)
+				strm->ctrlh(event, mb, strm->arg);
+		}
+		break;
+
+	case RTMP_EVENT_PING_REQUEST:
+
+		err = rtmp_control(conn, RTMP_TYPE_USER_CONTROL_MSG,
+				   RTMP_EVENT_PING_RESPONSE, value);
+		if (err)
+			return err;
+		break;
+
+	default:
+		break;
+	}
+
+	return 0;
+}
+
+
+static int handle_data_message(struct rtmp_conn *conn, uint32_t stream_id,
+			       struct mbuf *mb)
+{
+	struct rtmp_stream *strm;
+	struct odict *msg;
+	int err;
+
+	err = rtmp_amf_decode(&msg, mb);
+	if (err)
+		return err;
+
+	strm = rtmp_stream_find(conn, stream_id);
+	if (strm && strm->datah)
+		strm->datah(msg, strm->arg);
+
+	mem_deref(msg);
+
+	return 0;
+}
+
+
+static int rtmp_dechunk_handler(const struct rtmp_header *hdr,
+				struct mbuf *mb, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	struct rtmp_stream *strm;
+	uint32_t val;
+	uint32_t was;
+	uint8_t limit;
+	int err = 0;
+
+	switch (hdr->type_id) {
+
+	case RTMP_TYPE_SET_CHUNK_SIZE:
+		if (mbuf_get_left(mb) < 4)
+			return EBADMSG;
+
+		val = ntohl(mbuf_read_u32(mb));
+
+		val = val & 0x7fffffff;
+
+		rtmp_dechunker_set_chunksize(conn->dechunk, val);
+		break;
+
+	case RTMP_TYPE_ACKNOWLEDGEMENT:
+		if (mbuf_get_left(mb) < 4)
+			return EBADMSG;
+
+		val = ntohl(mbuf_read_u32(mb));
+		(void)val;
+		break;
+
+	case RTMP_TYPE_AMF0:
+		err = handle_amf_command(conn, hdr->stream_id, mb);
+		break;
+
+	case RTMP_TYPE_WINDOW_ACK_SIZE:
+		if (mbuf_get_left(mb) < 4)
+			return EBADMSG;
+
+		was = ntohl(mbuf_read_u32(mb));
+		if (was != 0)
+			conn->window_ack_size = was;
+		break;
+
+	case RTMP_TYPE_SET_PEER_BANDWIDTH:
+		if (mbuf_get_left(mb) < 5)
+			return EBADMSG;
+
+		was = ntohl(mbuf_read_u32(mb));
+		limit = mbuf_read_u8(mb);
+		(void)limit;
+
+		if (was != 0)
+			conn->window_ack_size = was;
+
+		err = rtmp_control(conn, RTMP_TYPE_WINDOW_ACK_SIZE,
+				   (uint32_t)WINDOW_ACK_SIZE);
+		break;
+
+	case RTMP_TYPE_USER_CONTROL_MSG:
+		err = handle_user_control_msg(conn, mb);
+		break;
+
+		/* XXX: common code for audio+video */
+	case RTMP_TYPE_AUDIO:
+		strm = rtmp_stream_find(conn, hdr->stream_id);
+		if (strm) {
+			if (strm->auh) {
+				strm->auh(hdr->timestamp,
+					  mb->buf, mb->end,
+					  strm->arg);
+			}
+		}
+		break;
+
+	case RTMP_TYPE_VIDEO:
+		strm = rtmp_stream_find(conn, hdr->stream_id);
+		if (strm) {
+			if (strm->vidh) {
+				strm->vidh(hdr->timestamp,
+					   mb->buf, mb->end,
+					   strm->arg);
+			}
+		}
+		break;
+
+	case RTMP_TYPE_DATA:
+		err = handle_data_message(conn, hdr->stream_id, mb);
+		break;
+
+	default:
+		break;
+	}
+
+	return err;
+}
+
+
+static struct rtmp_conn *rtmp_conn_alloc(bool is_client,
+					 rtmp_estab_h *estabh,
+					 rtmp_command_h *cmdh,
+					 rtmp_close_h *closeh,
+					 void *arg)
+{
+	struct rtmp_conn *conn;
+	int err;
+
+	conn = mem_zalloc(sizeof(*conn), conn_destructor);
+	if (!conn)
+		return NULL;
+
+	conn->is_client = is_client;
+	conn->state = RTMP_STATE_UNINITIALIZED;
+
+	conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
+	conn->window_ack_size = WINDOW_ACK_SIZE;
+
+	err = rtmp_dechunker_alloc(&conn->dechunk, RTMP_DEFAULT_CHUNKSIZE,
+				   rtmp_dechunk_handler, conn);
+	if (err)
+		goto out;
+
+	/* must be above 2 */
+	conn->chunk_id_counter = RTMP_CHUNK_ID_CONN + 1;
+
+	conn->estabh = estabh;
+	conn->cmdh   = cmdh;
+	conn->closeh = closeh;
+	conn->arg    = arg;
+
+ out:
+	if (err)
+		return mem_deref(conn);
+
+	return conn;
+}
+
+
+static inline void set_state(struct rtmp_conn *conn,
+			     enum rtmp_handshake_state state)
+{
+	conn->state = state;
+}
+
+
+static int send_packet(struct rtmp_conn *conn, const uint8_t *pkt, size_t len)
+{
+	struct mbuf *mb;
+	int err;
+
+	if (!conn || !pkt || !len)
+		return EINVAL;
+
+	mb = mbuf_alloc(len);
+	if (!mb)
+		return ENOMEM;
+
+	(void)mbuf_write_mem(mb, pkt, len);
+
+	mb->pos = 0;
+
+	err = tcp_send(conn->tc, mb);
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
+
+
+static int handshake_start(struct rtmp_conn *conn)
+{
+	uint8_t sig[1+RTMP_HANDSHAKE_SIZE];
+	int err;
+
+	sig[0] = RTMP_PROTOCOL_VERSION;
+	sig[1] = 0;
+	sig[2] = 0;
+	sig[3] = 0;
+	sig[4] = 0;
+	sig[5] = VER_MAJOR;
+	sig[6] = VER_MINOR;
+	sig[7] = VER_PATCH;
+	sig[8] = 0;
+	rand_bytes(sig + 9, sizeof(sig) - 9);
+
+	err = send_packet(conn, sig, sizeof(sig));
+	if (err)
+		return err;
+
+	set_state(conn, RTMP_STATE_VERSION_SENT);
+
+	return 0;
+}
+
+
+static void conn_close(struct rtmp_conn *conn, int err)
+{
+	rtmp_close_h *closeh;
+
+	conn->tc = mem_deref(conn->tc);
+	conn->dnsq6 = mem_deref(conn->dnsq6);
+	conn->dnsq4 = mem_deref(conn->dnsq4);
+
+	closeh = conn->closeh;
+	if (closeh) {
+		conn->closeh = NULL;
+		closeh(err, conn->arg);
+	}
+}
+
+
+static void tcp_estab_handler(void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	int err = 0;
+
+	if (conn->is_client) {
+
+		err = handshake_start(conn);
+	}
+
+	if (err)
+		conn_close(conn, err);
+}
+
+
+/* Send AMF0 Command or Data */
+int rtmp_send_amf_command(const struct rtmp_conn *conn,
+			  unsigned format, uint32_t chunk_id,
+			  uint8_t type_id,
+			  uint32_t msg_stream_id,
+			  const uint8_t *cmd, size_t len)
+{
+	if (!conn || !cmd || !len)
+		return EINVAL;
+
+	return rtmp_chunker(format, chunk_id, 0, 0, type_id, msg_stream_id,
+			    cmd, len, conn->send_chunk_size,
+			    conn->tc);
+}
+
+
+static void connect_resp_handler(bool success, const struct odict *msg,
+				 void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	rtmp_estab_h *estabh;
+	(void)msg;
+
+	if (!success) {
+		conn_close(conn, EPROTO);
+		return;
+	}
+
+	conn->connected = true;
+
+	estabh = conn->estabh;
+	if (estabh) {
+		conn->estabh = NULL;
+		estabh(conn->arg);
+	}
+}
+
+
+static int send_connect(struct rtmp_conn *conn)
+{
+	const int ac  = 0x0400;  /* AAC  */
+	const int vc  = 0x0080;  /* H264 */
+
+	return rtmp_amf_request(conn, RTMP_CONTROL_STREAM_ID, "connect",
+				connect_resp_handler, conn,
+				1,
+			RTMP_AMF_TYPE_OBJECT, 8,
+		          RTMP_AMF_TYPE_STRING, "app", conn->app,
+		          RTMP_AMF_TYPE_STRING, "flashVer", "LNX 9,0,124,2",
+		          RTMP_AMF_TYPE_STRING, "tcUrl", conn->uri,
+		          RTMP_AMF_TYPE_BOOLEAN, "fpad", false,
+		          RTMP_AMF_TYPE_NUMBER, "capabilities", 15.0,
+		          RTMP_AMF_TYPE_NUMBER, "audioCodecs", (double)ac,
+		          RTMP_AMF_TYPE_NUMBER, "videoCodecs", (double)vc,
+		          RTMP_AMF_TYPE_NUMBER, "videoFunction", 1.0);
+}
+
+
+static int client_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
+{
+	uint8_t s0;
+	uint8_t s1[RTMP_HANDSHAKE_SIZE];
+	int err = 0;
+
+	switch (conn->state) {
+
+	case RTMP_STATE_VERSION_SENT:
+		if (mbuf_get_left(mb) < (1+RTMP_HANDSHAKE_SIZE))
+			return ENODATA;
+
+		s0 = mbuf_read_u8(mb);
+		if (s0 != RTMP_PROTOCOL_VERSION)
+			return EPROTO;
+
+		(void)mbuf_read_mem(mb, s1, sizeof(s1));
+
+		err = send_packet(conn, s1, sizeof(s1));
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_ACK_SENT);
+		break;
+
+	case RTMP_STATE_ACK_SENT:
+		if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+			return ENODATA;
+
+		/* S2 (ignored) */
+		mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
+
+		conn->send_chunk_size = 4096;
+		err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
+				   conn->send_chunk_size);
+		if (err)
+			return err;
+
+		err = send_connect(conn);
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
+		break;
+
+	case RTMP_STATE_HANDSHAKE_DONE:
+		err = rtmp_dechunker_receive(conn->dechunk, mb);
+		if (err)
+			return err;
+		break;
+
+	default:
+		return EPROTO;
+	}
+
+	return 0;
+}
+
+
+static int server_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
+{
+	uint8_t c0;
+	uint8_t c1[RTMP_HANDSHAKE_SIZE];
+	int err = 0;
+
+	switch (conn->state) {
+
+	case RTMP_STATE_UNINITIALIZED:
+		if (mbuf_get_left(mb) < 1)
+			return ENODATA;
+
+		c0 = mbuf_read_u8(mb);
+		if (c0 != RTMP_PROTOCOL_VERSION)
+			return EPROTO;
+
+		/* Send S0 + S1 */
+		err = handshake_start(conn);
+		if (err)
+			return err;
+		break;
+
+	case RTMP_STATE_VERSION_SENT:
+		if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+			return ENODATA;
+
+		(void)mbuf_read_mem(mb, c1, sizeof(c1));
+
+		/* Copy C1 to S2 */
+		err = send_packet(conn, c1, sizeof(c1));
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_ACK_SENT);
+		break;
+
+	case RTMP_STATE_ACK_SENT:
+		if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+			return ENODATA;
+
+		/* C2 (ignored) */
+		mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
+
+		conn->send_chunk_size = 4096;
+		err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
+				   conn->send_chunk_size);
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
+		break;
+
+	case RTMP_STATE_HANDSHAKE_DONE:
+		err = rtmp_dechunker_receive(conn->dechunk, mb);
+		if (err)
+			return err;
+		break;
+
+	default:
+		return EPROTO;
+	}
+
+	return 0;
+}
+
+
+static void tcp_recv_handler(struct mbuf *mb_pkt, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	int err;
+
+	conn->total_bytes += mbuf_get_left(mb_pkt);
+
+	/* re-assembly of fragments */
+	if (conn->mb) {
+		const size_t len = mbuf_get_left(mb_pkt), pos = conn->mb->pos;
+
+		if ((mbuf_get_left(conn->mb) + len) > RTMP_MESSAGE_LEN_MAX) {
+			err = EOVERFLOW;
+			goto out;
+		}
+
+		conn->mb->pos = conn->mb->end;
+
+		err = mbuf_write_mem(conn->mb,
+				     mbuf_buf(mb_pkt), mbuf_get_left(mb_pkt));
+		if (err)
+			goto out;
+
+		conn->mb->pos = pos;
+	}
+	else {
+		conn->mb = mem_ref(mb_pkt);
+	}
+
+	while (mbuf_get_left(conn->mb) > 0) {
+
+		size_t pos;
+		uint32_t nrefs;
+
+		pos = conn->mb->pos;
+
+		mem_ref(conn);
+
+		if (conn->is_client)
+			err = client_handle_packet(conn, conn->mb);
+		else
+			err = server_handle_packet(conn, conn->mb);
+
+		nrefs = mem_nrefs(conn);
+
+		mem_deref(conn);
+
+		if (nrefs == 1)
+			return;
+
+		if (!conn->tc)
+			return;
+
+		if (err) {
+
+			/* rewind */
+			conn->mb->pos = pos;
+
+			if (err == ENODATA)
+				err = 0;
+			break;
+		}
+
+
+		if (conn->mb->pos >= conn->mb->end) {
+			conn->mb = mem_deref(conn->mb);
+			break;
+		}
+	}
+
+	if (err)
+		goto out;
+
+	if (conn->total_bytes >= (conn->last_ack + conn->window_ack_size)) {
+
+		conn->last_ack = conn->total_bytes;
+
+		err = rtmp_control(conn, RTMP_TYPE_ACKNOWLEDGEMENT,
+				   (uint32_t)conn->total_bytes);
+		if (err)
+			goto out;
+	}
+
+ out:
+	if (err)
+		conn_close(conn, err);
+}
+
+
+static void tcp_close_handler(int err, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+
+	if (conn->is_client && !conn->connected && conn->srvc > 0) {
+		err = req_connect(conn);
+		if (!err)
+			return;
+	}
+
+	conn_close(conn, err);
+}
+
+
+static int req_connect(struct rtmp_conn *conn)
+{
+	const struct sa *addr;
+	int err = EINVAL;
+
+	while (conn->srvc > 0) {
+
+		--conn->srvc;
+
+		addr = &conn->srvv[conn->srvc];
+
+		conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
+		conn->window_ack_size = WINDOW_ACK_SIZE;
+		conn->state = RTMP_STATE_UNINITIALIZED;
+		conn->last_ack = 0;
+		conn->total_bytes = 0;
+		conn->mb = mem_deref(conn->mb);
+		conn->tc = mem_deref(conn->tc);
+
+		rtmp_dechunker_set_chunksize(conn->dechunk,
+					     RTMP_DEFAULT_CHUNKSIZE);
+
+		err = tcp_connect(&conn->tc, addr, tcp_estab_handler,
+				  tcp_recv_handler, tcp_close_handler, conn);
+		if (!err)
+			break;
+	}
+
+	return err;
+}
+
+
+static bool rr_handler(struct dnsrr *rr, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+
+	if (conn->srvc >= ARRAY_SIZE(conn->srvv))
+		return true;
+
+	switch (rr->type) {
+
+	case DNS_TYPE_A:
+		sa_set_in(&conn->srvv[conn->srvc++], rr->rdata.a.addr,
+                          conn->port);
+		break;
+
+	case DNS_TYPE_AAAA:
+		sa_set_in6(&conn->srvv[conn->srvc++], rr->rdata.aaaa.addr,
+			   conn->port);
+		break;
+	}
+
+	return false;
+}
+
+
+static void query_handler(int err, const struct dnshdr *hdr, struct list *ansl,
+			  struct list *authl, struct list *addl, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	(void)hdr;
+	(void)authl;
+	(void)addl;
+
+	dns_rrlist_apply2(ansl, conn->host, DNS_TYPE_A, DNS_TYPE_AAAA,
+                          DNS_CLASS_IN, true, rr_handler, conn);
+
+	/* wait for other (A/AAAA) query to complete */
+	if (conn->dnsq4 || conn->dnsq6)
+		return;
+
+	if (conn->srvc == 0) {
+		err = err ? err : EDESTADDRREQ;
+		goto out;
+	}
+
+	err = req_connect(conn);
+	if (err)
+		goto out;
+
+	return;
+
+ out:
+	conn_close(conn, err);
+}
+
+
+/**
+ * Connect to an RTMP server
+ *
+ * @param connp  Pointer to allocated RTMP connection object
+ * @param dnsc   DNS Client for resolving FQDN uris
+ * @param uri    RTMP uri to connect to
+ * @param estabh Established handler
+ * @param cmdh   Incoming command handler
+ * @param closeh Close handler
+ * @param arg    Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ *
+ * Example URIs:
+ *
+ *     rtmp://a.rtmp.youtube.com/live2/my-stream
+ *     rtmp://[::1]/vod/mp4:sample.mp4
+ */
+int rtmp_connect(struct rtmp_conn **connp, struct dnsc *dnsc, const char *uri,
+		 rtmp_estab_h *estabh, rtmp_command_h *cmdh,
+		 rtmp_close_h *closeh, void *arg)
+{
+	struct rtmp_conn *conn;
+	struct pl pl_hostport;
+	struct pl pl_host;
+	struct pl pl_port;
+	struct pl pl_app;
+	struct pl pl_stream;
+	int err;
+
+	if (!connp || !uri)
+		return EINVAL;
+
+	if (re_regex(uri, strlen(uri), "rtmp://[^/]+/[^/]+/[^]+",
+		     &pl_hostport, &pl_app, &pl_stream))
+		return EINVAL;
+
+	if (uri_decode_hostport(&pl_hostport, &pl_host, &pl_port))
+		return EINVAL;
+
+	conn = rtmp_conn_alloc(true, estabh, cmdh, closeh, arg);
+	if (!conn)
+		return ENOMEM;
+
+	conn->port = pl_isset(&pl_port) ? pl_u32(&pl_port) : RTMP_PORT;
+
+	err  = pl_strdup(&conn->app, &pl_app);
+	err |= pl_strdup(&conn->stream, &pl_stream);
+	err |= str_dup(&conn->uri, uri);
+	if (err)
+		goto out;
+
+	if (0 == sa_set(&conn->srvv[0], &pl_host, conn->port)) {
+
+		conn->srvc = 1;
+
+		err = req_connect(conn);
+		if (err)
+			goto out;
+	}
+	else {
+#ifdef HAVE_INET6
+		struct sa tmp;
+#endif
+
+		if (!dnsc) {
+			err = EINVAL;
+			goto out;
+		}
+
+		err = pl_strdup(&conn->host, &pl_host);
+		if (err)
+			goto out;
+
+		conn->dnsc = mem_ref(dnsc);
+
+		err = dnsc_query(&conn->dnsq4, dnsc, conn->host, DNS_TYPE_A,
+				 DNS_CLASS_IN, true, query_handler, conn);
+		if (err)
+			goto out;
+
+#ifdef HAVE_INET6
+		if (0 == net_default_source_addr_get(AF_INET6, &tmp)) {
+
+			err = dnsc_query(&conn->dnsq6, dnsc, conn->host,
+					 DNS_TYPE_AAAA, DNS_CLASS_IN,
+					 true, query_handler, conn);
+			if (err)
+				goto out;
+		}
+#endif
+	}
+
+ out:
+	if (err)
+		mem_deref(conn);
+	else
+		*connp = conn;
+
+	return err;
+}
+
+
+/**
+ * Accept an incoming TCP connection creating an RTMP Server connection
+ *
+ * @param connp  Pointer to allocated RTMP connection object
+ * @param ts     TCP socket with pending connection
+ * @param cmdh   Incoming command handler
+ * @param closeh Close handler
+ * @param arg    Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_accept(struct rtmp_conn **connp, struct tcp_sock *ts,
+		rtmp_command_h *cmdh, rtmp_close_h *closeh, void *arg)
+{
+	struct rtmp_conn *conn;
+	int err;
+
+	if (!connp || !ts)
+		return EINVAL;
+
+	conn = rtmp_conn_alloc(false, NULL, cmdh, closeh, arg);
+	if (!conn)
+		return ENOMEM;
+
+	err = tcp_accept(&conn->tc, ts, tcp_estab_handler,
+			 tcp_recv_handler, tcp_close_handler, conn);
+	if (err)
+		goto out;
+
+ out:
+	if (err)
+		mem_deref(conn);
+	else
+		*connp = conn;
+
+	return err;
+}
+
+
+int rtmp_conn_send_msg(const struct rtmp_conn *conn,
+		       unsigned format, uint32_t chunk_id,
+		       uint32_t timestamp, uint32_t timestamp_delta,
+		       uint8_t msg_type_id, uint32_t msg_stream_id,
+		       const uint8_t *payload, size_t payload_len)
+{
+	if (!conn || !payload || !payload_len)
+		return EINVAL;
+
+	return rtmp_chunker(format, chunk_id, timestamp, timestamp_delta,
+			    msg_type_id, msg_stream_id, payload, payload_len,
+			    conn->send_chunk_size,
+			    conn->tc);
+}
+
+
+unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn)
+{
+	if (!conn)
+		return 0;
+
+	return ++conn->chunk_id_counter;
+}
+
+
+uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn)
+{
+	if (!conn)
+		return 0;
+
+	return ++conn->tid_counter;
+}
+
+
+/**
+ * Get the underlying TCP connection from an RTMP connection
+ *
+ * @param conn RTMP Connection
+ *
+ * @return TCP-Connection
+ */
+struct tcp_conn *rtmp_conn_tcpconn(const struct rtmp_conn *conn)
+{
+	return conn ? conn->tc : NULL;
+}
+
+
+/**
+ * Get the RTMP connection stream name from rtmp_connect
+ *
+ * @param conn RTMP Connection
+ *
+ * @return RTMP Stream name or NULL
+ */
+const char *rtmp_conn_stream(const struct rtmp_conn *conn)
+{
+	return conn ? conn->stream : NULL;
+}
+
+
+/**
+ * Set callback handlers for the RTMP connection
+ *
+ * @param conn   RTMP connection
+ * @param cmdh   Incoming command handler
+ * @param closeh Close handler
+ * @param arg    Handler argument
+ */
+void rtmp_set_handlers(struct rtmp_conn *conn, rtmp_command_h *cmdh,
+		       rtmp_close_h *closeh, void *arg)
+{
+	if (!conn)
+		return;
+
+	conn->cmdh   = cmdh;
+	conn->closeh = closeh;
+	conn->arg    = arg;
+}
+
+
+static const char *rtmp_handshake_name(enum rtmp_handshake_state state)
+{
+	switch (state) {
+
+	case RTMP_STATE_UNINITIALIZED:  return "UNINITIALIZED";
+	case RTMP_STATE_VERSION_SENT:   return "VERSION_SENT";
+	case RTMP_STATE_ACK_SENT:       return "ACK_SENT";
+	case RTMP_STATE_HANDSHAKE_DONE: return "HANDSHAKE_DONE";
+	default: return "?";
+	}
+}
+
+
+int rtmp_conn_debug(struct re_printf *pf, const struct rtmp_conn *conn)
+{
+	int err = 0;
+
+	if (!conn)
+		return 0;
+
+	err |= re_hprintf(pf, "role:          %s\n",
+			  conn->is_client ? "Client" : "Server");
+	err |= re_hprintf(pf, "state:         %s\n",
+			  rtmp_handshake_name(conn->state));
+	err |= re_hprintf(pf, "connected:     %d\n", conn->connected);
+	err |= re_hprintf(pf, "chunk_size:    send=%u\n",
+			  conn->send_chunk_size);
+	err |= re_hprintf(pf, "bytes:         %zu\n", conn->total_bytes);
+	err |= re_hprintf(pf, "streams:       %u\n",
+			  list_count(&conn->streaml));
+
+	if (conn->is_client) {
+		err |= re_hprintf(pf, "uri:           %s\n", conn->uri);
+		err |= re_hprintf(pf, "app:           %s\n", conn->app);
+		err |= re_hprintf(pf, "stream:        %s\n", conn->stream);
+	}
+
+	err |= re_hprintf(pf, "%H\n", rtmp_dechunker_debug, conn->dechunk);
+
+	return err;
+}
diff --git a/src/rtmp/control.c b/src/rtmp/control.c
new file mode 100644
index 0000000..f479a13
--- /dev/null
+++ b/src/rtmp/control.c
@@ -0,0 +1,106 @@
+/**
+ * @file rtmp/control.c  Real Time Messaging Protocol (RTMP) -- Control
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+/**
+ * Send an RTMP control message
+ *
+ * @param conn RTMP connection
+ * @param type RTMP Packet type
+ * @param ...  Optional packet arguments
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_control(const struct rtmp_conn *conn, enum rtmp_packet_type type, ...)
+{
+	struct mbuf *mb;
+	uint32_t u32;
+	uint16_t event;
+	va_list ap;
+	int err = 0;
+
+	if (!conn)
+		return EINVAL;
+
+	mb = mbuf_alloc(8);
+	if (!mb)
+		return ENOMEM;
+
+	va_start(ap, type);
+
+	switch (type) {
+
+	case RTMP_TYPE_SET_CHUNK_SIZE:
+	case RTMP_TYPE_WINDOW_ACK_SIZE:
+	case RTMP_TYPE_ACKNOWLEDGEMENT:
+		u32 = va_arg(ap, uint32_t);
+		err = mbuf_write_u32(mb, htonl(u32));
+		break;
+
+	case RTMP_TYPE_USER_CONTROL_MSG:
+		event = va_arg(ap, unsigned);
+		err  = mbuf_write_u16(mb, htons(event));
+		err |= mbuf_write_u32(mb, htonl(va_arg(ap, uint32_t)));
+		break;
+
+	case RTMP_TYPE_SET_PEER_BANDWIDTH:
+		err  = mbuf_write_u32(mb, htonl(va_arg(ap, uint32_t)));
+		err |= mbuf_write_u8(mb, va_arg(ap, unsigned));
+		break;
+
+	default:
+		err = ENOTSUP;
+		break;
+	}
+
+	va_end(ap);
+
+	if (err)
+		goto out;
+
+	err = rtmp_conn_send_msg(conn, 0, RTMP_CHUNK_ID_CONTROL, 0, 0, type,
+				 RTMP_CONTROL_STREAM_ID, mb->buf, mb->end);
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
+
+
+/**
+ * Get the event name as a string
+ *
+ * @param event RTMP Event type
+ *
+ * @return Name of the event as a string
+ */
+const char *rtmp_event_name(enum rtmp_event_type event)
+{
+	switch (event) {
+
+	case RTMP_EVENT_STREAM_BEGIN:        return "StreamBegin";
+	case RTMP_EVENT_STREAM_EOF:          return "StreamEOF";
+	case RTMP_EVENT_STREAM_DRY:          return "StreamDry";
+	case RTMP_EVENT_SET_BUFFER_LENGTH:   return "SetBufferLength";
+	case RTMP_EVENT_STREAM_IS_RECORDED:  return "StreamIsRecorded";
+	case RTMP_EVENT_PING_REQUEST:        return "PingRequest";
+	case RTMP_EVENT_PING_RESPONSE:       return "PingResponse";
+	default: return "?";
+	}
+}
diff --git a/src/rtmp/ctrans.c b/src/rtmp/ctrans.c
new file mode 100644
index 0000000..6e56f84
--- /dev/null
+++ b/src/rtmp/ctrans.c
@@ -0,0 +1,138 @@
+/**
+ * @file rtmp/ctrans.c  Real Time Messaging Protocol -- AMF Client Transactions
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+struct rtmp_ctrans {
+	struct le le;
+	uint64_t tid;
+	rtmp_resp_h *resph;
+	void *arg;
+};
+
+
+static void ctrans_destructor(void *data)
+{
+	struct rtmp_ctrans *ct = data;
+
+	list_unlink(&ct->le);
+}
+
+
+static struct rtmp_ctrans *rtmp_ctrans_find(const struct list *ctransl,
+					    uint64_t tid)
+{
+	struct le *le;
+
+	for (le = list_head(ctransl); le; le = le->next) {
+		struct rtmp_ctrans *ct = le->data;
+
+		if (tid == ct->tid)
+			return ct;
+	}
+
+	return NULL;
+}
+
+
+int rtmp_amf_request(struct rtmp_conn *conn, uint32_t stream_id,
+		     const char *command,
+		     rtmp_resp_h *resph, void *arg, unsigned body_propc, ...)
+{
+	struct rtmp_ctrans *ct = NULL;
+	struct mbuf *mb;
+	va_list ap;
+	int err;
+
+	if (!conn || !command || !resph)
+		return EINVAL;
+
+	mb = mbuf_alloc(512);
+	if (!mb)
+		return ENOMEM;
+
+	ct = mem_zalloc(sizeof(*ct), ctrans_destructor);
+	if (!ct) {
+		err = ENOMEM;
+		goto out;
+	}
+
+	ct->tid   = rtmp_conn_assign_tid(conn);
+	ct->resph = resph;
+	ct->arg   = arg;
+
+	err = rtmp_command_header_encode(mb, command, ct->tid);
+	if (err)
+		goto out;
+
+	if (body_propc) {
+		va_start(ap, body_propc);
+		err = rtmp_amf_vencode_object(mb, RTMP_AMF_TYPE_ROOT,
+					      body_propc, &ap);
+		va_end(ap);
+		if (err)
+			goto out;
+	}
+
+	err = rtmp_send_amf_command(conn, 0, RTMP_CHUNK_ID_CONN,
+				    RTMP_TYPE_AMF0,
+				    stream_id, mb->buf, mb->end);
+	if (err)
+		goto out;
+
+	list_append(&conn->ctransl, &ct->le, ct);
+
+ out:
+	mem_deref(mb);
+	if (err)
+		mem_deref(ct);
+
+	return err;
+}
+
+
+int rtmp_ctrans_response(const struct list *ctransl,
+			 const struct odict *msg)
+{
+	struct rtmp_ctrans *ct;
+	uint64_t tid;
+	bool success;
+	rtmp_resp_h *resph;
+	void *arg;
+
+	if (!ctransl || !msg)
+		return EINVAL;
+
+	success = (0 == str_casecmp(odict_string(msg, "0"), "_result"));
+
+	if (!odict_get_number(msg, &tid, "1"))
+		return EPROTO;
+
+	ct = rtmp_ctrans_find(ctransl, tid);
+	if (!ct)
+		return ENOENT;
+
+	resph = ct->resph;
+	arg = ct->arg;
+
+	mem_deref(ct);
+
+	resph(success, msg, arg);
+
+	return 0;
+}
diff --git a/src/rtmp/dechunk.c b/src/rtmp/dechunk.c
new file mode 100644
index 0000000..df82d78
--- /dev/null
+++ b/src/rtmp/dechunk.c
@@ -0,0 +1,292 @@
+/**
+ * @file rtmp/dechunk.c  Real Time Messaging Protocol (RTMP) -- Dechunking
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_list.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+	MAX_CHUNKS = 64,
+};
+
+
+struct rtmp_chunk {
+	struct le le;
+	struct rtmp_header hdr;
+	struct mbuf *mb;
+};
+
+/** Defines the RTMP Dechunker */
+struct rtmp_dechunker {
+	struct list chunkl;      /* struct rtmp_chunk */
+	size_t chunk_sz;
+	rtmp_dechunk_h *chunkh;
+	void *arg;
+};
+
+
+static void destructor(void *data)
+{
+	struct rtmp_dechunker *rd = data;
+
+	list_flush(&rd->chunkl);
+}
+
+
+static void chunk_destructor(void *data)
+{
+	struct rtmp_chunk *chunk = data;
+
+	list_unlink(&chunk->le);
+	mem_deref(chunk->mb);
+}
+
+
+static struct rtmp_chunk *create_chunk(struct list *chunkl,
+				       const struct rtmp_header *hdr)
+{
+	struct rtmp_chunk *chunk;
+
+	chunk = mem_zalloc(sizeof(*chunk), chunk_destructor);
+	if (!chunk)
+		return NULL;
+
+	chunk->hdr = *hdr;
+
+	list_append(chunkl, &chunk->le, chunk);
+
+	return chunk;
+}
+
+
+static struct rtmp_chunk *find_chunk(const struct list *chunkl,
+				     uint32_t chunk_id)
+{
+	struct le *le;
+
+	for (le = list_head(chunkl); le; le = le->next) {
+
+		struct rtmp_chunk *chunk = le->data;
+
+		if (chunk_id == chunk->hdr.chunk_id)
+			return chunk;
+	}
+
+	return NULL;
+}
+
+
+/*
+ * Stateful RTMP de-chunker for receiving complete messages
+ */
+int  rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
+			  rtmp_dechunk_h *chunkh, void *arg)
+{
+	struct rtmp_dechunker *rd;
+
+	if (!rdp || !chunk_sz || !chunkh)
+		return EINVAL;
+
+	rd = mem_zalloc(sizeof(*rd), destructor);
+	if (!rd)
+		return ENOMEM;
+
+	rd->chunk_sz = chunk_sz;
+
+	rd->chunkh = chunkh;
+	rd->arg    = arg;
+
+	*rdp = rd;
+
+	return 0;
+}
+
+
+int rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb)
+{
+	struct rtmp_header hdr;
+	struct rtmp_chunk *chunk;
+	size_t chunk_sz, left, msg_len;
+	int err;
+
+	if (!rd || !mb)
+		return EINVAL;
+
+	err = rtmp_header_decode(&hdr, mb);
+	if (err)
+		return err;
+
+	/* find preceding chunk, from chunk id */
+	chunk = find_chunk(&rd->chunkl, hdr.chunk_id);
+	if (!chunk) {
+
+		/* only type 0 can create a new chunk stream */
+		if (hdr.format == 0) {
+			if (list_count(&rd->chunkl) > MAX_CHUNKS)
+				return EOVERFLOW;
+
+			chunk = create_chunk(&rd->chunkl, &hdr);
+			if (!chunk)
+				return ENOMEM;
+		}
+		else
+			return ENOENT;
+	}
+
+	switch (hdr.format) {
+
+	case 0:
+	case 1:
+	case 2:
+		if (hdr.format == 0) {
+
+			/* copy the whole header */
+			chunk->hdr = hdr;
+		}
+		else if (hdr.format == 1) {
+
+			chunk->hdr.timestamp_delta = hdr.timestamp_delta;
+			chunk->hdr.length          = hdr.length;
+			chunk->hdr.type_id         = hdr.type_id;
+		}
+		else if (hdr.format == 2) {
+
+			chunk->hdr.timestamp_delta = hdr.timestamp_delta;
+		}
+
+		msg_len = chunk->hdr.length;
+
+		chunk_sz = min(msg_len, rd->chunk_sz);
+
+		if (mbuf_get_left(mb) < chunk_sz)
+			return ENODATA;
+
+		mem_deref(chunk->mb);
+		chunk->mb = mbuf_alloc(msg_len);
+		if (!chunk->mb)
+			return ENOMEM;
+
+		err = mbuf_read_mem(mb, chunk->mb->buf, chunk_sz);
+		if (err)
+			return err;
+
+		chunk->mb->pos = chunk_sz;
+		chunk->mb->end = chunk_sz;
+
+		chunk->hdr.format = hdr.format;
+		chunk->hdr.ext_ts = hdr.ext_ts;
+
+		if (hdr.format == 1 || hdr.format == 2)
+			chunk->hdr.timestamp += hdr.timestamp_delta;
+		break;
+
+	case 3:
+		if (chunk->hdr.ext_ts) {
+
+			uint32_t ext_ts;
+
+			if (mbuf_get_left(mb) < 4)
+				return ENODATA;
+
+			ext_ts = ntohl(mbuf_read_u32(mb));
+
+			if (chunk->hdr.format == 0)
+				chunk->hdr.timestamp = ext_ts;
+			else
+				chunk->hdr.timestamp_delta = ext_ts;
+		}
+
+		if (!chunk->mb) {
+
+			chunk->mb = mbuf_alloc(chunk->hdr.length);
+			if (!chunk->mb)
+				return ENOMEM;
+
+			if (chunk->hdr.format == 0) {
+				chunk->hdr.timestamp_delta =
+					chunk->hdr.timestamp;
+			}
+
+			chunk->hdr.timestamp += chunk->hdr.timestamp_delta;
+		}
+
+		left = mbuf_get_space(chunk->mb);
+
+		chunk_sz = min(left, rd->chunk_sz);
+
+		if (mbuf_get_left(mb) < chunk_sz)
+			return ENODATA;
+
+		err = mbuf_read_mem(mb, mbuf_buf(chunk->mb), chunk_sz);
+		if (err)
+			return err;
+
+		chunk->mb->pos += chunk_sz;
+		chunk->mb->end += chunk_sz;
+		break;
+
+	default:
+		return EPROTO;
+	}
+
+	if (chunk->mb->pos >= chunk->mb->size) {
+
+		struct mbuf *buf;
+
+		chunk->mb->pos = 0;
+
+		buf = chunk->mb;
+		chunk->mb = NULL;
+
+		err = rd->chunkh(&chunk->hdr, buf, rd->arg);
+
+		mem_deref(buf);
+	}
+
+	return err;
+}
+
+
+void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz)
+{
+	if (!rd || !chunk_sz)
+		return;
+
+	rd->chunk_sz = chunk_sz;
+}
+
+
+int rtmp_dechunker_debug(struct re_printf *pf, const struct rtmp_dechunker *rd)
+{
+	struct le *le;
+	int err;
+
+	if (!rd)
+		return 0;
+
+	err  = re_hprintf(pf, "Dechunker Debug:\n");
+
+	err |= re_hprintf(pf, "chunk list: (%u)\n", list_count(&rd->chunkl));
+
+	for (le = rd->chunkl.head; le; le = le->next) {
+
+		const struct rtmp_chunk *msg = le->data;
+
+		err |= re_hprintf(pf, ".. %H\n",
+				  rtmp_header_print, &msg->hdr);
+	}
+
+	err |= re_hprintf(pf, "\n");
+
+	return err;
+}
diff --git a/src/rtmp/hdr.c b/src/rtmp/hdr.c
new file mode 100644
index 0000000..38bd370
--- /dev/null
+++ b/src/rtmp/hdr.c
@@ -0,0 +1,279 @@
+/**
+ * @file rtmp/hdr.c  Real Time Messaging Protocol (RTMP) -- Headers
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_sys.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+	RTMP_CHUNK_ID_MIN     = 3,
+	RTMP_CHUNK_ID_MAX     = 65599,  /* 65535 + 64 */
+
+	RTMP_CHUNK_OFFSET     = 64,
+	TIMESTAMP_24MAX       = 0x00ffffff,
+};
+
+
+static int mbuf_write_u24_hton(struct mbuf *mb, uint32_t u24)
+{
+	int err = 0;
+
+	err |= mbuf_write_u8(mb, u24 >> 16);
+	err |= mbuf_write_u8(mb, u24 >> 8);
+	err |= mbuf_write_u8(mb, u24 >> 0);
+
+	return err;
+}
+
+
+static uint32_t mbuf_read_u24_ntoh(struct mbuf *mb)
+{
+	uint32_t u24;
+
+	u24  = (uint32_t)mbuf_read_u8(mb) << 16;
+	u24 |= (uint32_t)mbuf_read_u8(mb) << 8;
+	u24 |= (uint32_t)mbuf_read_u8(mb) << 0;
+
+	return u24;
+}
+
+
+static int encode_basic_hdr(struct mbuf *mb, unsigned fmt,
+			    uint32_t chunk_id)
+{
+	uint8_t v;
+	int err = 0;
+
+	if (chunk_id >= 320) {
+
+		const uint16_t cs_id = chunk_id - RTMP_CHUNK_OFFSET;
+
+		v = fmt<<6 | 1;
+
+		err |= mbuf_write_u8(mb, v);
+		err |= mbuf_write_u16(mb, htons(cs_id));
+	}
+	else if (chunk_id >= RTMP_CHUNK_OFFSET) {
+
+		const uint8_t cs_id = chunk_id - RTMP_CHUNK_OFFSET;
+
+		v = fmt<<6 | 0;
+
+		err |= mbuf_write_u8(mb, v);
+		err |= mbuf_write_u8(mb, cs_id);
+	}
+	else {
+		v = fmt<<6 | chunk_id;
+
+		err |= mbuf_write_u8(mb, v);
+	}
+
+	return err;
+}
+
+
+static int decode_basic_hdr(struct rtmp_header *hdr, struct mbuf *mb)
+{
+	uint8_t cs_id;
+	uint8_t v;
+
+	if (mbuf_get_left(mb) < 1)
+		return ENODATA;
+
+	v = mbuf_read_u8(mb);
+
+	hdr->format = v>>6;
+
+	cs_id = v & 0x3f;
+
+	switch (cs_id) {
+
+	case 0:
+		if (mbuf_get_left(mb) < 1)
+			return ENODATA;
+
+		hdr->chunk_id = mbuf_read_u8(mb) + RTMP_CHUNK_OFFSET;
+		break;
+
+	case 1:
+		if (mbuf_get_left(mb) < 2)
+			return ENODATA;
+
+		hdr->chunk_id = ntohs(mbuf_read_u16(mb)) + RTMP_CHUNK_OFFSET;
+		break;
+
+	default:
+		hdr->chunk_id = cs_id;
+		break;
+	}
+
+	return 0;
+}
+
+
+static uint32_t ts_24(uint32_t ts)
+{
+	return ts >= TIMESTAMP_24MAX ? TIMESTAMP_24MAX : ts;
+}
+
+
+static uint32_t ts_ext(uint32_t ts)
+{
+	return ts >= TIMESTAMP_24MAX ? ts : 0;
+}
+
+
+int rtmp_header_encode(struct mbuf *mb, struct rtmp_header *hdr)
+{
+	int err = 0;
+
+	if (!mb || !hdr)
+		return EINVAL;
+
+	err = encode_basic_hdr(mb, hdr->format, hdr->chunk_id);
+	if (err)
+		return err;
+
+	switch (hdr->format) {
+
+	case 0:
+		hdr->timestamp_ext = ts_ext(hdr->timestamp);
+
+		err |= mbuf_write_u24_hton(mb, ts_24(hdr->timestamp));
+		err |= mbuf_write_u24_hton(mb, hdr->length);
+		err |= mbuf_write_u8(mb, hdr->type_id);
+		err |= mbuf_write_u32(mb, sys_htoll(hdr->stream_id));
+		break;
+
+	case 1:
+		hdr->timestamp_ext = ts_ext(hdr->timestamp_delta);
+
+		err |= mbuf_write_u24_hton(mb, ts_24(hdr->timestamp_delta));
+		err |= mbuf_write_u24_hton(mb, hdr->length);
+		err |= mbuf_write_u8(mb, hdr->type_id);
+		break;
+
+	case 2:
+		hdr->timestamp_ext = ts_ext(hdr->timestamp_delta);
+
+		err |= mbuf_write_u24_hton(mb, ts_24(hdr->timestamp_delta));
+		break;
+
+	case 3:
+		break;
+	}
+
+	if (hdr->timestamp_ext) {
+		err |= mbuf_write_u32(mb, htonl(hdr->timestamp_ext));
+	}
+
+	return err;
+}
+
+
+int rtmp_header_decode(struct rtmp_header *hdr, struct mbuf *mb)
+{
+	uint32_t *timestamp_ext = NULL;
+	int err;
+
+	if (!hdr || !mb)
+		return EINVAL;
+
+	memset(hdr, 0, sizeof(*hdr));
+
+	err = decode_basic_hdr(hdr, mb);
+	if (err)
+		return err;
+
+	switch (hdr->format) {
+
+	case 0:
+		if (mbuf_get_left(mb) < 11)
+			return ENODATA;
+
+		hdr->timestamp = mbuf_read_u24_ntoh(mb);
+		hdr->length    = mbuf_read_u24_ntoh(mb);
+		hdr->type_id   = mbuf_read_u8(mb);
+		hdr->stream_id = sys_ltohl(mbuf_read_u32(mb));
+		break;
+
+	case 1:
+		if (mbuf_get_left(mb) < 7)
+			return ENODATA;
+
+		hdr->timestamp_delta = mbuf_read_u24_ntoh(mb);
+		hdr->length          = mbuf_read_u24_ntoh(mb);
+		hdr->type_id         = mbuf_read_u8(mb);
+		break;
+
+	case 2:
+		if (mbuf_get_left(mb) < 3)
+			return ENODATA;
+
+		hdr->timestamp_delta = mbuf_read_u24_ntoh(mb);
+		break;
+
+	case 3:
+		/* no payload */
+		break;
+	}
+
+	if (hdr->timestamp == TIMESTAMP_24MAX)
+		timestamp_ext = &hdr->timestamp;
+	else if (hdr->timestamp_delta == TIMESTAMP_24MAX)
+		timestamp_ext = &hdr->timestamp_delta;
+
+	if (timestamp_ext) {
+		if (mbuf_get_left(mb) < 4)
+			return ENODATA;
+
+		*timestamp_ext = ntohl(mbuf_read_u32(mb));
+		hdr->ext_ts = true;
+	}
+
+	return 0;
+}
+
+
+int rtmp_header_print(struct re_printf *pf, const struct rtmp_header *hdr)
+{
+	if (!hdr)
+		return 0;
+
+	return re_hprintf(pf,
+			  "fmt %u, chunk %u, "
+			  "timestamp %5u, ts_delta %2u,"
+			  " len %3u, type %2u (%-14s) stream_id %u",
+			  hdr->format, hdr->chunk_id, hdr->timestamp,
+			  hdr->timestamp_delta, hdr->length, hdr->type_id,
+			  rtmp_packet_type_name(hdr->type_id), hdr->stream_id);
+}
+
+
+const char *rtmp_packet_type_name(enum rtmp_packet_type type)
+{
+	switch (type) {
+
+	case RTMP_TYPE_SET_CHUNK_SIZE:    return "Set Chunk Size";
+	case RTMP_TYPE_ACKNOWLEDGEMENT:   return "Acknowledgement";
+	case RTMP_TYPE_USER_CONTROL_MSG:  return "User Control Message";
+	case RTMP_TYPE_WINDOW_ACK_SIZE:   return "Window Acknowledgement Size";
+	case RTMP_TYPE_SET_PEER_BANDWIDTH:return "Set Peer Bandwidth";
+	case RTMP_TYPE_AUDIO:             return "Audio Message";
+	case RTMP_TYPE_VIDEO:             return "Video Message";
+	case RTMP_TYPE_DATA:              return "Data Message";
+	case RTMP_TYPE_AMF0:              return "AMF";
+	default: return "?";
+	}
+}
diff --git a/src/rtmp/mod.mk b/src/rtmp/mod.mk
new file mode 100644
index 0000000..c1b73df
--- /dev/null
+++ b/src/rtmp/mod.mk
@@ -0,0 +1,16 @@
+#
+# mod.mk
+#
+# Copyright (C) 2010 Creytiv.com
+#
+
+SRCS	+= rtmp/amf.c
+SRCS	+= rtmp/amf_dec.c
+SRCS	+= rtmp/amf_enc.c
+SRCS	+= rtmp/chunk.c
+SRCS	+= rtmp/conn.c
+SRCS	+= rtmp/control.c
+SRCS	+= rtmp/ctrans.c
+SRCS	+= rtmp/dechunk.c
+SRCS	+= rtmp/hdr.c
+SRCS	+= rtmp/stream.c
diff --git a/src/rtmp/rtmp.h b/src/rtmp/rtmp.h
new file mode 100644
index 0000000..90085e0
--- /dev/null
+++ b/src/rtmp/rtmp.h
@@ -0,0 +1,178 @@
+/**
+ * @file rtmp.h  Real Time Messaging Protocol (RTMP) -- Internal API
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+
+
+enum {
+	RTMP_PROTOCOL_VERSION  = 3,
+	RTMP_DEFAULT_CHUNKSIZE = 128,
+	RTMP_HANDSHAKE_SIZE    = 1536,
+	RTMP_MESSAGE_LEN_MAX   = 524288,
+};
+
+/* Chunk IDs */
+enum {
+	RTMP_CHUNK_ID_CONTROL  = 2,
+	RTMP_CHUNK_ID_CONN     = 3,
+};
+
+/** Defines the RTMP Handshake State */
+enum rtmp_handshake_state {
+	RTMP_STATE_UNINITIALIZED = 0,
+	RTMP_STATE_VERSION_SENT,
+	RTMP_STATE_ACK_SENT,
+	RTMP_STATE_HANDSHAKE_DONE
+};
+
+/**
+ * Defines an RTMP Connection
+ */
+struct rtmp_conn {
+	struct list streaml;
+	struct rtmp_dechunker *dechunk;
+	struct tcp_conn *tc;
+	struct mbuf *mb;                        /* TCP reassembly buffer */
+	enum rtmp_handshake_state state;
+	size_t total_bytes;
+	size_t last_ack;
+	uint32_t window_ack_size;
+	uint32_t send_chunk_size;
+	unsigned chunk_id_counter;
+	bool is_client;
+	bool connected;
+	rtmp_estab_h *estabh;
+	rtmp_command_h *cmdh;
+	rtmp_close_h *closeh;
+	void *arg;
+
+	/* client specific: */
+	struct dnsc *dnsc;
+	struct dns_query *dnsq4;
+	struct dns_query *dnsq6;
+	struct list ctransl;
+	struct sa srvv[16];
+	unsigned srvc;
+	uint64_t tid_counter;
+	uint16_t port;
+	char *app;
+	char *uri;
+	char *stream;
+	char *host;
+};
+
+/**
+ * Defines an RTMP Stream
+ */
+struct rtmp_stream {
+	struct le le;
+	const struct rtmp_conn *conn;    /**< Pointer to parent connection */
+	bool created;
+	uint32_t stream_id;
+	unsigned chunk_id_audio;
+	unsigned chunk_id_video;
+	unsigned chunk_id_data;
+	rtmp_audio_h *auh;
+	rtmp_video_h *vidh;
+	rtmp_command_h *datah;
+	rtmp_command_h *cmdh;
+	rtmp_resp_h *resph;
+	rtmp_control_h *ctrlh;
+	void *arg;
+};
+
+struct rtmp_header {
+	unsigned format:2;           /* type 0-3 */
+	uint32_t chunk_id;           /* from 3-65599 */
+
+	uint32_t timestamp;          /* 24-bit or 32-bit */
+	uint32_t timestamp_delta;    /* 24-bit */
+	uint32_t timestamp_ext;
+	uint32_t length;             /* 24-bit */
+	uint8_t type_id;             /* enum rtmp_packet_type */
+	uint32_t stream_id;
+	bool ext_ts;
+};
+
+
+/* Command */
+
+int rtmp_command_header_encode(struct mbuf *mb, const char *name,
+			       uint64_t tid);
+
+/* Connection */
+
+int rtmp_conn_send_msg(const struct rtmp_conn *conn, unsigned format,
+		       uint32_t chunk_id, uint32_t timestamp,
+		       uint32_t timestamp_delta, uint8_t msg_type_id,
+		       uint32_t msg_stream_id,
+		       const uint8_t *payload, size_t payload_len);
+int rtmp_send_amf_command(const struct rtmp_conn *conn,
+			  unsigned format, uint32_t chunk_id,
+			  uint8_t type_id,
+			  uint32_t msg_stream_id,
+			  const uint8_t *cmd, size_t len);
+unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn);
+uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn);
+
+
+/* Client Transaction */
+
+
+struct rtmp_ctrans;
+
+int  rtmp_ctrans_response(const struct list *ctransl,
+			  const struct odict *msg);
+
+
+/*
+ * RTMP Chunk
+ */
+
+int rtmp_chunker(unsigned format, uint32_t chunk_id,
+		 uint32_t timestamp, uint32_t timestamp_delta,
+		 uint8_t msg_type_id, uint32_t msg_stream_id,
+		 const uint8_t *payload, size_t payload_len,
+		 size_t max_chunk_sz, struct tcp_conn *tc);
+
+
+/*
+ * RTMP Header
+ */
+
+int  rtmp_header_encode(struct mbuf *mb, struct rtmp_header *hdr);
+int  rtmp_header_decode(struct rtmp_header *hdr, struct mbuf *mb);
+int  rtmp_header_print(struct re_printf *pf, const struct rtmp_header *hdr);
+const char *rtmp_packet_type_name(enum rtmp_packet_type type);
+
+
+/*
+ * RTMP De-chunker
+ */
+
+struct rtmp_dechunker;
+
+typedef int (rtmp_dechunk_h)(const struct rtmp_header *hdr,
+			     struct mbuf *mb, void *arg);
+
+int  rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
+			  rtmp_dechunk_h *chunkh, void *arg);
+int  rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb);
+void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz);
+int  rtmp_dechunker_debug(struct re_printf *pf,
+			  const struct rtmp_dechunker *rd);
+
+
+/*
+ * AMF (Action Message Format)
+ */
+
+int rtmp_amf_encode_number(struct mbuf *mb, double val);
+int rtmp_amf_encode_boolean(struct mbuf *mb, bool boolean);
+int rtmp_amf_encode_string(struct mbuf *mb, const char *str);
+int rtmp_amf_encode_null(struct mbuf *mb);
+int rtmp_amf_vencode_object(struct mbuf *mb, enum rtmp_amf_type container,
+			    unsigned propc, va_list *ap);
+
+int rtmp_amf_decode(struct odict **msgp, struct mbuf *mb);
diff --git a/src/rtmp/stream.c b/src/rtmp/stream.c
new file mode 100644
index 0000000..fe8748d
--- /dev/null
+++ b/src/rtmp/stream.c
@@ -0,0 +1,309 @@
+/**
+ * @file rtmp/stream.c  Real Time Messaging Protocol (RTMP) -- NetStream
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+static void destructor(void *data)
+{
+	struct rtmp_stream *strm = data;
+
+	list_unlink(&strm->le);
+
+	if (strm->created) {
+
+		rtmp_amf_command(strm->conn, 0, "deleteStream",
+				 3,
+				RTMP_AMF_TYPE_NUMBER, 0.0,
+				RTMP_AMF_TYPE_NULL,
+				RTMP_AMF_TYPE_NUMBER, (double)strm->stream_id);
+	}
+}
+
+
+/**
+ * Allocate a new RTMP Stream object
+ *
+ * @param strmp     Pointer to allocated RTMP Stream
+ * @param conn      RTMP Connection
+ * @param stream_id Stream id
+ * @param cmdh      Command handler
+ * @param ctrlh     Control handler
+ * @param auh       Audio handler
+ * @param vidh      Video handler
+ * @param datah     Data handler
+ * @param arg       Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_stream_alloc(struct rtmp_stream **strmp, struct rtmp_conn *conn,
+		      uint32_t stream_id, rtmp_command_h *cmdh,
+		      rtmp_control_h *ctrlh, rtmp_audio_h *auh,
+		      rtmp_video_h *vidh, rtmp_command_h *datah,
+		      void *arg)
+{
+	struct rtmp_stream *strm;
+
+	if (!strmp || !conn)
+		return EINVAL;
+
+	strm = mem_zalloc(sizeof(*strm), destructor);
+	if (!strm)
+		return ENOMEM;
+
+	strm->conn      = conn;
+	strm->stream_id = stream_id;
+
+	strm->cmdh   = cmdh;
+	strm->ctrlh  = ctrlh;
+	strm->auh    = auh;
+	strm->vidh   = vidh;
+	strm->datah  = datah;
+	strm->arg    = arg;
+
+	strm->chunk_id_audio = rtmp_conn_assign_chunkid(conn);
+	strm->chunk_id_video = rtmp_conn_assign_chunkid(conn);
+	strm->chunk_id_data  = rtmp_conn_assign_chunkid(conn);
+
+	list_append(&conn->streaml, &strm->le, strm);
+
+	*strmp = strm;
+
+	return 0;
+}
+
+
+static void createstream_handler(bool success, const struct odict *msg,
+				 void *arg)
+{
+	struct rtmp_stream *strm = arg;
+	uint64_t num;
+
+	if (!success)
+		goto out;
+
+	if (!odict_get_number(msg, &num, "3")) {
+		success = false;
+		goto out;
+	}
+
+	strm->stream_id = (uint32_t)num;
+	if (strm->stream_id == 0) {
+		success = false;
+		goto out;
+	}
+
+	strm->created = true;
+
+ out:
+	if (strm->resph)
+		strm->resph(success, msg, strm->arg);
+}
+
+
+/**
+ * Create a new RTMP Stream by sending "createStream" to the RTMP Server.
+ *
+ * @param strmp     Pointer to allocated RTMP Stream
+ * @param conn      RTMP Connection
+ * @param resph     RTMP Response handler
+ * @param cmdh      Command handler
+ * @param ctrlh     Control handler
+ * @param auh       Audio handler
+ * @param vidh      Video handler
+ * @param datah     Data handler
+ * @param arg       Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_stream_create(struct rtmp_stream **strmp, struct rtmp_conn *conn,
+		       rtmp_resp_h *resph, rtmp_command_h *cmdh,
+		       rtmp_control_h *ctrlh, rtmp_audio_h *auh,
+		       rtmp_video_h *vidh, rtmp_command_h *datah,
+		       void *arg)
+{
+	struct rtmp_stream *strm;
+	int err;
+
+	if (!strmp || !conn)
+		return EINVAL;
+
+	err = rtmp_stream_alloc(&strm, conn, (uint32_t)-1,
+				cmdh, ctrlh, auh, vidh, datah, arg);
+	if (err)
+		return err;
+
+	strm->resph = resph;
+
+	err = rtmp_amf_request(conn, 0,
+			       "createStream", createstream_handler, strm,
+			       1,
+			       RTMP_AMF_TYPE_NULL);
+	if (err)
+		goto out;
+
+ out:
+	if (err)
+		mem_deref(strm);
+	else
+		*strmp = strm;
+
+	return err;
+}
+
+
+/**
+ * Start playing an RTMP Stream by sending "play" to the RTMP Server
+ *
+ * @param strm RTMP Stream
+ * @param name Stream name
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_play(struct rtmp_stream *strm, const char *name)
+{
+	if (!strm || !name)
+		return EINVAL;
+
+	return rtmp_amf_command(strm->conn, strm->stream_id, "play",
+				4,
+				RTMP_AMF_TYPE_NUMBER, 0.0,
+				RTMP_AMF_TYPE_NULL,
+				RTMP_AMF_TYPE_STRING, name,
+				RTMP_AMF_TYPE_NUMBER, -2000.0);
+}
+
+
+/**
+ * Start publishing an RTMP Stream by sending "publish" to the RTMP Server
+ *
+ * @param strm RTMP Stream
+ * @param name Stream name
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_publish(struct rtmp_stream *strm, const char *name)
+{
+	if (!strm || !name)
+		return EINVAL;
+
+	return rtmp_amf_command(strm->conn, strm->stream_id, "publish",
+				4,
+				RTMP_AMF_TYPE_NUMBER, 0.0,
+				RTMP_AMF_TYPE_NULL,
+				RTMP_AMF_TYPE_STRING, name,
+				RTMP_AMF_TYPE_STRING, "live");
+}
+
+
+/**
+ * Send metadata on the stream to the RTMP Server
+ *
+ * @param strm RTMP Stream
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_meta(struct rtmp_stream *strm)
+{
+	if (!strm)
+		return EINVAL;
+
+	return rtmp_amf_data(strm->conn, strm->stream_id, "@setDataFrame",
+			     2,
+			     RTMP_AMF_TYPE_STRING, "onMetaData",
+			     RTMP_AMF_TYPE_ECMA_ARRAY, 2,
+			         RTMP_AMF_TYPE_NUMBER, "audiocodecid", 10.0,
+			         RTMP_AMF_TYPE_NUMBER, "videocodecid",  7.0);
+}
+
+
+/**
+ * Send audio packet on the RTMP Stream
+ *
+ * @param strm      RTMP Stream
+ * @param timestamp Timestamp in [milliseconds]
+ * @param pld       Audio payload
+ * @param len       Payload length
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_send_audio(struct rtmp_stream *strm, uint32_t timestamp,
+		    const uint8_t *pld, size_t len)
+{
+	uint32_t chunk_id;
+
+	if (!strm || !pld || !len)
+		return EINVAL;
+
+	chunk_id = strm->chunk_id_audio;
+
+	return rtmp_conn_send_msg(strm->conn, 0, chunk_id, timestamp, 0,
+				  RTMP_TYPE_AUDIO, strm->stream_id, pld, len);
+}
+
+
+/**
+ * Send video packet on the RTMP Stream
+ *
+ * @param strm      RTMP Stream
+ * @param timestamp Timestamp in [milliseconds]
+ * @param pld       Video payload
+ * @param len       Payload length
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_send_video(struct rtmp_stream *strm, uint32_t timestamp,
+		    const uint8_t *pld, size_t len)
+{
+	uint32_t chunk_id;
+
+	if (!strm || !pld || !len)
+		return EINVAL;
+
+	chunk_id = strm->chunk_id_video;
+
+	return rtmp_conn_send_msg(strm->conn, 0, chunk_id, timestamp, 0,
+				  RTMP_TYPE_VIDEO, strm->stream_id, pld, len);
+}
+
+
+/**
+ * Find an RTMP Stream by stream id
+ *
+ * @param conn      RTMP Connection
+ * @param stream_id Stream id
+ *
+ * @return RTMP Stream if found, or NULL if not found
+ */
+struct rtmp_stream *rtmp_stream_find(const struct rtmp_conn *conn,
+				     uint32_t stream_id)
+{
+	struct le *le;
+
+	if (!conn)
+		return NULL;
+
+	for (le = list_head(&conn->streaml); le; le = le->next) {
+
+		struct rtmp_stream *strm = le->data;
+
+		if (stream_id == strm->stream_id)
+			return strm;
+	}
+
+	return NULL;
+}