Squashed 'third_party/rawrtc/re/' content from commit f3163ce8b

Change-Id: I6a235e6ac0f03269d951026f9d195da05c40fdab
git-subtree-dir: third_party/rawrtc/re
git-subtree-split: f3163ce8b526a13b35ef71ce4dd6f43585064d8a
diff --git a/src/rtmp/conn.c b/src/rtmp/conn.c
new file mode 100644
index 0000000..f08a5a0
--- /dev/null
+++ b/src/rtmp/conn.c
@@ -0,0 +1,1023 @@
+/**
+ * @file rtmp/conn.c  Real Time Messaging Protocol (RTMP) -- NetConnection
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_net.h>
+#include <re_sa.h>
+#include <re_list.h>
+#include <re_tcp.h>
+#include <re_sys.h>
+#include <re_odict.h>
+#include <re_dns.h>
+#include <re_uri.h>
+#include <re_rtmp.h>
+#include "rtmp.h"
+
+
+enum {
+	WINDOW_ACK_SIZE = 2500000
+};
+
+
+static int req_connect(struct rtmp_conn *conn);
+
+
+static void conn_destructor(void *data)
+{
+	struct rtmp_conn *conn = data;
+
+	list_flush(&conn->ctransl);
+	list_flush(&conn->streaml);
+
+	mem_deref(conn->dnsq6);
+	mem_deref(conn->dnsq4);
+	mem_deref(conn->dnsc);
+	mem_deref(conn->tc);
+	mem_deref(conn->mb);
+	mem_deref(conn->dechunk);
+	mem_deref(conn->uri);
+	mem_deref(conn->app);
+	mem_deref(conn->host);
+	mem_deref(conn->stream);
+}
+
+
+static int handle_amf_command(struct rtmp_conn *conn, uint32_t stream_id,
+			      struct mbuf *mb)
+{
+	struct odict *msg = NULL;
+	const char *name;
+	int err;
+
+	err = rtmp_amf_decode(&msg, mb);
+	if (err)
+		return err;
+
+	name = odict_string(msg, "0");
+
+	if (conn->is_client &&
+	    (0 == str_casecmp(name, "_result") ||
+	     0 == str_casecmp(name, "_error"))) {
+
+		/* forward response to transaction layer */
+		rtmp_ctrans_response(&conn->ctransl, msg);
+	}
+	else {
+		struct rtmp_stream *strm;
+
+		if (stream_id == 0) {
+			if (conn->cmdh)
+				conn->cmdh(msg, conn->arg);
+		}
+		else {
+			strm = rtmp_stream_find(conn, stream_id);
+			if (strm) {
+				if (strm->cmdh)
+					strm->cmdh(msg, strm->arg);
+			}
+		}
+	}
+
+	mem_deref(msg);
+
+	return 0;
+}
+
+
+static int handle_user_control_msg(struct rtmp_conn *conn, struct mbuf *mb)
+{
+	struct rtmp_stream *strm;
+	enum rtmp_event_type event;
+	uint32_t value;
+	int err;
+
+	if (mbuf_get_left(mb) < 6)
+		return EBADMSG;
+
+	event = ntohs(mbuf_read_u16(mb));
+	value = ntohl(mbuf_read_u32(mb));
+
+	switch (event) {
+
+	case RTMP_EVENT_STREAM_BEGIN:
+	case RTMP_EVENT_STREAM_EOF:
+	case RTMP_EVENT_STREAM_DRY:
+	case RTMP_EVENT_STREAM_IS_RECORDED:
+	case RTMP_EVENT_SET_BUFFER_LENGTH:
+
+		if (value != RTMP_CONTROL_STREAM_ID) {
+
+			strm = rtmp_stream_find(conn, value);
+			if (strm && strm->ctrlh)
+				strm->ctrlh(event, mb, strm->arg);
+		}
+		break;
+
+	case RTMP_EVENT_PING_REQUEST:
+
+		err = rtmp_control(conn, RTMP_TYPE_USER_CONTROL_MSG,
+				   RTMP_EVENT_PING_RESPONSE, value);
+		if (err)
+			return err;
+		break;
+
+	default:
+		break;
+	}
+
+	return 0;
+}
+
+
+static int handle_data_message(struct rtmp_conn *conn, uint32_t stream_id,
+			       struct mbuf *mb)
+{
+	struct rtmp_stream *strm;
+	struct odict *msg;
+	int err;
+
+	err = rtmp_amf_decode(&msg, mb);
+	if (err)
+		return err;
+
+	strm = rtmp_stream_find(conn, stream_id);
+	if (strm && strm->datah)
+		strm->datah(msg, strm->arg);
+
+	mem_deref(msg);
+
+	return 0;
+}
+
+
+static int rtmp_dechunk_handler(const struct rtmp_header *hdr,
+				struct mbuf *mb, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	struct rtmp_stream *strm;
+	uint32_t val;
+	uint32_t was;
+	uint8_t limit;
+	int err = 0;
+
+	switch (hdr->type_id) {
+
+	case RTMP_TYPE_SET_CHUNK_SIZE:
+		if (mbuf_get_left(mb) < 4)
+			return EBADMSG;
+
+		val = ntohl(mbuf_read_u32(mb));
+
+		val = val & 0x7fffffff;
+
+		rtmp_dechunker_set_chunksize(conn->dechunk, val);
+		break;
+
+	case RTMP_TYPE_ACKNOWLEDGEMENT:
+		if (mbuf_get_left(mb) < 4)
+			return EBADMSG;
+
+		val = ntohl(mbuf_read_u32(mb));
+		(void)val;
+		break;
+
+	case RTMP_TYPE_AMF0:
+		err = handle_amf_command(conn, hdr->stream_id, mb);
+		break;
+
+	case RTMP_TYPE_WINDOW_ACK_SIZE:
+		if (mbuf_get_left(mb) < 4)
+			return EBADMSG;
+
+		was = ntohl(mbuf_read_u32(mb));
+		if (was != 0)
+			conn->window_ack_size = was;
+		break;
+
+	case RTMP_TYPE_SET_PEER_BANDWIDTH:
+		if (mbuf_get_left(mb) < 5)
+			return EBADMSG;
+
+		was = ntohl(mbuf_read_u32(mb));
+		limit = mbuf_read_u8(mb);
+		(void)limit;
+
+		if (was != 0)
+			conn->window_ack_size = was;
+
+		err = rtmp_control(conn, RTMP_TYPE_WINDOW_ACK_SIZE,
+				   (uint32_t)WINDOW_ACK_SIZE);
+		break;
+
+	case RTMP_TYPE_USER_CONTROL_MSG:
+		err = handle_user_control_msg(conn, mb);
+		break;
+
+		/* XXX: common code for audio+video */
+	case RTMP_TYPE_AUDIO:
+		strm = rtmp_stream_find(conn, hdr->stream_id);
+		if (strm) {
+			if (strm->auh) {
+				strm->auh(hdr->timestamp,
+					  mb->buf, mb->end,
+					  strm->arg);
+			}
+		}
+		break;
+
+	case RTMP_TYPE_VIDEO:
+		strm = rtmp_stream_find(conn, hdr->stream_id);
+		if (strm) {
+			if (strm->vidh) {
+				strm->vidh(hdr->timestamp,
+					   mb->buf, mb->end,
+					   strm->arg);
+			}
+		}
+		break;
+
+	case RTMP_TYPE_DATA:
+		err = handle_data_message(conn, hdr->stream_id, mb);
+		break;
+
+	default:
+		break;
+	}
+
+	return err;
+}
+
+
+static struct rtmp_conn *rtmp_conn_alloc(bool is_client,
+					 rtmp_estab_h *estabh,
+					 rtmp_command_h *cmdh,
+					 rtmp_close_h *closeh,
+					 void *arg)
+{
+	struct rtmp_conn *conn;
+	int err;
+
+	conn = mem_zalloc(sizeof(*conn), conn_destructor);
+	if (!conn)
+		return NULL;
+
+	conn->is_client = is_client;
+	conn->state = RTMP_STATE_UNINITIALIZED;
+
+	conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
+	conn->window_ack_size = WINDOW_ACK_SIZE;
+
+	err = rtmp_dechunker_alloc(&conn->dechunk, RTMP_DEFAULT_CHUNKSIZE,
+				   rtmp_dechunk_handler, conn);
+	if (err)
+		goto out;
+
+	/* must be above 2 */
+	conn->chunk_id_counter = RTMP_CHUNK_ID_CONN + 1;
+
+	conn->estabh = estabh;
+	conn->cmdh   = cmdh;
+	conn->closeh = closeh;
+	conn->arg    = arg;
+
+ out:
+	if (err)
+		return mem_deref(conn);
+
+	return conn;
+}
+
+
+static inline void set_state(struct rtmp_conn *conn,
+			     enum rtmp_handshake_state state)
+{
+	conn->state = state;
+}
+
+
+static int send_packet(struct rtmp_conn *conn, const uint8_t *pkt, size_t len)
+{
+	struct mbuf *mb;
+	int err;
+
+	if (!conn || !pkt || !len)
+		return EINVAL;
+
+	mb = mbuf_alloc(len);
+	if (!mb)
+		return ENOMEM;
+
+	(void)mbuf_write_mem(mb, pkt, len);
+
+	mb->pos = 0;
+
+	err = tcp_send(conn->tc, mb);
+	if (err)
+		goto out;
+
+ out:
+	mem_deref(mb);
+
+	return err;
+}
+
+
+static int handshake_start(struct rtmp_conn *conn)
+{
+	uint8_t sig[1+RTMP_HANDSHAKE_SIZE];
+	int err;
+
+	sig[0] = RTMP_PROTOCOL_VERSION;
+	sig[1] = 0;
+	sig[2] = 0;
+	sig[3] = 0;
+	sig[4] = 0;
+	sig[5] = VER_MAJOR;
+	sig[6] = VER_MINOR;
+	sig[7] = VER_PATCH;
+	sig[8] = 0;
+	rand_bytes(sig + 9, sizeof(sig) - 9);
+
+	err = send_packet(conn, sig, sizeof(sig));
+	if (err)
+		return err;
+
+	set_state(conn, RTMP_STATE_VERSION_SENT);
+
+	return 0;
+}
+
+
+static void conn_close(struct rtmp_conn *conn, int err)
+{
+	rtmp_close_h *closeh;
+
+	conn->tc = mem_deref(conn->tc);
+	conn->dnsq6 = mem_deref(conn->dnsq6);
+	conn->dnsq4 = mem_deref(conn->dnsq4);
+
+	closeh = conn->closeh;
+	if (closeh) {
+		conn->closeh = NULL;
+		closeh(err, conn->arg);
+	}
+}
+
+
+static void tcp_estab_handler(void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	int err = 0;
+
+	if (conn->is_client) {
+
+		err = handshake_start(conn);
+	}
+
+	if (err)
+		conn_close(conn, err);
+}
+
+
+/* Send AMF0 Command or Data */
+int rtmp_send_amf_command(const struct rtmp_conn *conn,
+			  unsigned format, uint32_t chunk_id,
+			  uint8_t type_id,
+			  uint32_t msg_stream_id,
+			  const uint8_t *cmd, size_t len)
+{
+	if (!conn || !cmd || !len)
+		return EINVAL;
+
+	return rtmp_chunker(format, chunk_id, 0, 0, type_id, msg_stream_id,
+			    cmd, len, conn->send_chunk_size,
+			    conn->tc);
+}
+
+
+static void connect_resp_handler(bool success, const struct odict *msg,
+				 void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	rtmp_estab_h *estabh;
+	(void)msg;
+
+	if (!success) {
+		conn_close(conn, EPROTO);
+		return;
+	}
+
+	conn->connected = true;
+
+	estabh = conn->estabh;
+	if (estabh) {
+		conn->estabh = NULL;
+		estabh(conn->arg);
+	}
+}
+
+
+static int send_connect(struct rtmp_conn *conn)
+{
+	const int ac  = 0x0400;  /* AAC  */
+	const int vc  = 0x0080;  /* H264 */
+
+	return rtmp_amf_request(conn, RTMP_CONTROL_STREAM_ID, "connect",
+				connect_resp_handler, conn,
+				1,
+			RTMP_AMF_TYPE_OBJECT, 8,
+		          RTMP_AMF_TYPE_STRING, "app", conn->app,
+		          RTMP_AMF_TYPE_STRING, "flashVer", "LNX 9,0,124,2",
+		          RTMP_AMF_TYPE_STRING, "tcUrl", conn->uri,
+		          RTMP_AMF_TYPE_BOOLEAN, "fpad", false,
+		          RTMP_AMF_TYPE_NUMBER, "capabilities", 15.0,
+		          RTMP_AMF_TYPE_NUMBER, "audioCodecs", (double)ac,
+		          RTMP_AMF_TYPE_NUMBER, "videoCodecs", (double)vc,
+		          RTMP_AMF_TYPE_NUMBER, "videoFunction", 1.0);
+}
+
+
+static int client_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
+{
+	uint8_t s0;
+	uint8_t s1[RTMP_HANDSHAKE_SIZE];
+	int err = 0;
+
+	switch (conn->state) {
+
+	case RTMP_STATE_VERSION_SENT:
+		if (mbuf_get_left(mb) < (1+RTMP_HANDSHAKE_SIZE))
+			return ENODATA;
+
+		s0 = mbuf_read_u8(mb);
+		if (s0 != RTMP_PROTOCOL_VERSION)
+			return EPROTO;
+
+		(void)mbuf_read_mem(mb, s1, sizeof(s1));
+
+		err = send_packet(conn, s1, sizeof(s1));
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_ACK_SENT);
+		break;
+
+	case RTMP_STATE_ACK_SENT:
+		if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+			return ENODATA;
+
+		/* S2 (ignored) */
+		mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
+
+		conn->send_chunk_size = 4096;
+		err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
+				   conn->send_chunk_size);
+		if (err)
+			return err;
+
+		err = send_connect(conn);
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
+		break;
+
+	case RTMP_STATE_HANDSHAKE_DONE:
+		err = rtmp_dechunker_receive(conn->dechunk, mb);
+		if (err)
+			return err;
+		break;
+
+	default:
+		return EPROTO;
+	}
+
+	return 0;
+}
+
+
+static int server_handle_packet(struct rtmp_conn *conn, struct mbuf *mb)
+{
+	uint8_t c0;
+	uint8_t c1[RTMP_HANDSHAKE_SIZE];
+	int err = 0;
+
+	switch (conn->state) {
+
+	case RTMP_STATE_UNINITIALIZED:
+		if (mbuf_get_left(mb) < 1)
+			return ENODATA;
+
+		c0 = mbuf_read_u8(mb);
+		if (c0 != RTMP_PROTOCOL_VERSION)
+			return EPROTO;
+
+		/* Send S0 + S1 */
+		err = handshake_start(conn);
+		if (err)
+			return err;
+		break;
+
+	case RTMP_STATE_VERSION_SENT:
+		if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+			return ENODATA;
+
+		(void)mbuf_read_mem(mb, c1, sizeof(c1));
+
+		/* Copy C1 to S2 */
+		err = send_packet(conn, c1, sizeof(c1));
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_ACK_SENT);
+		break;
+
+	case RTMP_STATE_ACK_SENT:
+		if (mbuf_get_left(mb) < RTMP_HANDSHAKE_SIZE)
+			return ENODATA;
+
+		/* C2 (ignored) */
+		mbuf_advance(mb, RTMP_HANDSHAKE_SIZE);
+
+		conn->send_chunk_size = 4096;
+		err = rtmp_control(conn, RTMP_TYPE_SET_CHUNK_SIZE,
+				   conn->send_chunk_size);
+		if (err)
+			return err;
+
+		set_state(conn, RTMP_STATE_HANDSHAKE_DONE);
+		break;
+
+	case RTMP_STATE_HANDSHAKE_DONE:
+		err = rtmp_dechunker_receive(conn->dechunk, mb);
+		if (err)
+			return err;
+		break;
+
+	default:
+		return EPROTO;
+	}
+
+	return 0;
+}
+
+
+static void tcp_recv_handler(struct mbuf *mb_pkt, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	int err;
+
+	conn->total_bytes += mbuf_get_left(mb_pkt);
+
+	/* re-assembly of fragments */
+	if (conn->mb) {
+		const size_t len = mbuf_get_left(mb_pkt), pos = conn->mb->pos;
+
+		if ((mbuf_get_left(conn->mb) + len) > RTMP_MESSAGE_LEN_MAX) {
+			err = EOVERFLOW;
+			goto out;
+		}
+
+		conn->mb->pos = conn->mb->end;
+
+		err = mbuf_write_mem(conn->mb,
+				     mbuf_buf(mb_pkt), mbuf_get_left(mb_pkt));
+		if (err)
+			goto out;
+
+		conn->mb->pos = pos;
+	}
+	else {
+		conn->mb = mem_ref(mb_pkt);
+	}
+
+	while (mbuf_get_left(conn->mb) > 0) {
+
+		size_t pos;
+		uint32_t nrefs;
+
+		pos = conn->mb->pos;
+
+		mem_ref(conn);
+
+		if (conn->is_client)
+			err = client_handle_packet(conn, conn->mb);
+		else
+			err = server_handle_packet(conn, conn->mb);
+
+		nrefs = mem_nrefs(conn);
+
+		mem_deref(conn);
+
+		if (nrefs == 1)
+			return;
+
+		if (!conn->tc)
+			return;
+
+		if (err) {
+
+			/* rewind */
+			conn->mb->pos = pos;
+
+			if (err == ENODATA)
+				err = 0;
+			break;
+		}
+
+
+		if (conn->mb->pos >= conn->mb->end) {
+			conn->mb = mem_deref(conn->mb);
+			break;
+		}
+	}
+
+	if (err)
+		goto out;
+
+	if (conn->total_bytes >= (conn->last_ack + conn->window_ack_size)) {
+
+		conn->last_ack = conn->total_bytes;
+
+		err = rtmp_control(conn, RTMP_TYPE_ACKNOWLEDGEMENT,
+				   (uint32_t)conn->total_bytes);
+		if (err)
+			goto out;
+	}
+
+ out:
+	if (err)
+		conn_close(conn, err);
+}
+
+
+static void tcp_close_handler(int err, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+
+	if (conn->is_client && !conn->connected && conn->srvc > 0) {
+		err = req_connect(conn);
+		if (!err)
+			return;
+	}
+
+	conn_close(conn, err);
+}
+
+
+static int req_connect(struct rtmp_conn *conn)
+{
+	const struct sa *addr;
+	int err = EINVAL;
+
+	while (conn->srvc > 0) {
+
+		--conn->srvc;
+
+		addr = &conn->srvv[conn->srvc];
+
+		conn->send_chunk_size = RTMP_DEFAULT_CHUNKSIZE;
+		conn->window_ack_size = WINDOW_ACK_SIZE;
+		conn->state = RTMP_STATE_UNINITIALIZED;
+		conn->last_ack = 0;
+		conn->total_bytes = 0;
+		conn->mb = mem_deref(conn->mb);
+		conn->tc = mem_deref(conn->tc);
+
+		rtmp_dechunker_set_chunksize(conn->dechunk,
+					     RTMP_DEFAULT_CHUNKSIZE);
+
+		err = tcp_connect(&conn->tc, addr, tcp_estab_handler,
+				  tcp_recv_handler, tcp_close_handler, conn);
+		if (!err)
+			break;
+	}
+
+	return err;
+}
+
+
+static bool rr_handler(struct dnsrr *rr, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+
+	if (conn->srvc >= ARRAY_SIZE(conn->srvv))
+		return true;
+
+	switch (rr->type) {
+
+	case DNS_TYPE_A:
+		sa_set_in(&conn->srvv[conn->srvc++], rr->rdata.a.addr,
+                          conn->port);
+		break;
+
+	case DNS_TYPE_AAAA:
+		sa_set_in6(&conn->srvv[conn->srvc++], rr->rdata.aaaa.addr,
+			   conn->port);
+		break;
+	}
+
+	return false;
+}
+
+
+static void query_handler(int err, const struct dnshdr *hdr, struct list *ansl,
+			  struct list *authl, struct list *addl, void *arg)
+{
+	struct rtmp_conn *conn = arg;
+	(void)hdr;
+	(void)authl;
+	(void)addl;
+
+	dns_rrlist_apply2(ansl, conn->host, DNS_TYPE_A, DNS_TYPE_AAAA,
+                          DNS_CLASS_IN, true, rr_handler, conn);
+
+	/* wait for other (A/AAAA) query to complete */
+	if (conn->dnsq4 || conn->dnsq6)
+		return;
+
+	if (conn->srvc == 0) {
+		err = err ? err : EDESTADDRREQ;
+		goto out;
+	}
+
+	err = req_connect(conn);
+	if (err)
+		goto out;
+
+	return;
+
+ out:
+	conn_close(conn, err);
+}
+
+
+/**
+ * Connect to an RTMP server
+ *
+ * @param connp  Pointer to allocated RTMP connection object
+ * @param dnsc   DNS Client for resolving FQDN uris
+ * @param uri    RTMP uri to connect to
+ * @param estabh Established handler
+ * @param cmdh   Incoming command handler
+ * @param closeh Close handler
+ * @param arg    Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ *
+ * Example URIs:
+ *
+ *     rtmp://a.rtmp.youtube.com/live2/my-stream
+ *     rtmp://[::1]/vod/mp4:sample.mp4
+ */
+int rtmp_connect(struct rtmp_conn **connp, struct dnsc *dnsc, const char *uri,
+		 rtmp_estab_h *estabh, rtmp_command_h *cmdh,
+		 rtmp_close_h *closeh, void *arg)
+{
+	struct rtmp_conn *conn;
+	struct pl pl_hostport;
+	struct pl pl_host;
+	struct pl pl_port;
+	struct pl pl_app;
+	struct pl pl_stream;
+	int err;
+
+	if (!connp || !uri)
+		return EINVAL;
+
+	if (re_regex(uri, strlen(uri), "rtmp://[^/]+/[^/]+/[^]+",
+		     &pl_hostport, &pl_app, &pl_stream))
+		return EINVAL;
+
+	if (uri_decode_hostport(&pl_hostport, &pl_host, &pl_port))
+		return EINVAL;
+
+	conn = rtmp_conn_alloc(true, estabh, cmdh, closeh, arg);
+	if (!conn)
+		return ENOMEM;
+
+	conn->port = pl_isset(&pl_port) ? pl_u32(&pl_port) : RTMP_PORT;
+
+	err  = pl_strdup(&conn->app, &pl_app);
+	err |= pl_strdup(&conn->stream, &pl_stream);
+	err |= str_dup(&conn->uri, uri);
+	if (err)
+		goto out;
+
+	if (0 == sa_set(&conn->srvv[0], &pl_host, conn->port)) {
+
+		conn->srvc = 1;
+
+		err = req_connect(conn);
+		if (err)
+			goto out;
+	}
+	else {
+#ifdef HAVE_INET6
+		struct sa tmp;
+#endif
+
+		if (!dnsc) {
+			err = EINVAL;
+			goto out;
+		}
+
+		err = pl_strdup(&conn->host, &pl_host);
+		if (err)
+			goto out;
+
+		conn->dnsc = mem_ref(dnsc);
+
+		err = dnsc_query(&conn->dnsq4, dnsc, conn->host, DNS_TYPE_A,
+				 DNS_CLASS_IN, true, query_handler, conn);
+		if (err)
+			goto out;
+
+#ifdef HAVE_INET6
+		if (0 == net_default_source_addr_get(AF_INET6, &tmp)) {
+
+			err = dnsc_query(&conn->dnsq6, dnsc, conn->host,
+					 DNS_TYPE_AAAA, DNS_CLASS_IN,
+					 true, query_handler, conn);
+			if (err)
+				goto out;
+		}
+#endif
+	}
+
+ out:
+	if (err)
+		mem_deref(conn);
+	else
+		*connp = conn;
+
+	return err;
+}
+
+
+/**
+ * Accept an incoming TCP connection creating an RTMP Server connection
+ *
+ * @param connp  Pointer to allocated RTMP connection object
+ * @param ts     TCP socket with pending connection
+ * @param cmdh   Incoming command handler
+ * @param closeh Close handler
+ * @param arg    Handler argument
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtmp_accept(struct rtmp_conn **connp, struct tcp_sock *ts,
+		rtmp_command_h *cmdh, rtmp_close_h *closeh, void *arg)
+{
+	struct rtmp_conn *conn;
+	int err;
+
+	if (!connp || !ts)
+		return EINVAL;
+
+	conn = rtmp_conn_alloc(false, NULL, cmdh, closeh, arg);
+	if (!conn)
+		return ENOMEM;
+
+	err = tcp_accept(&conn->tc, ts, tcp_estab_handler,
+			 tcp_recv_handler, tcp_close_handler, conn);
+	if (err)
+		goto out;
+
+ out:
+	if (err)
+		mem_deref(conn);
+	else
+		*connp = conn;
+
+	return err;
+}
+
+
+int rtmp_conn_send_msg(const struct rtmp_conn *conn,
+		       unsigned format, uint32_t chunk_id,
+		       uint32_t timestamp, uint32_t timestamp_delta,
+		       uint8_t msg_type_id, uint32_t msg_stream_id,
+		       const uint8_t *payload, size_t payload_len)
+{
+	if (!conn || !payload || !payload_len)
+		return EINVAL;
+
+	return rtmp_chunker(format, chunk_id, timestamp, timestamp_delta,
+			    msg_type_id, msg_stream_id, payload, payload_len,
+			    conn->send_chunk_size,
+			    conn->tc);
+}
+
+
+unsigned rtmp_conn_assign_chunkid(struct rtmp_conn *conn)
+{
+	if (!conn)
+		return 0;
+
+	return ++conn->chunk_id_counter;
+}
+
+
+uint64_t rtmp_conn_assign_tid(struct rtmp_conn *conn)
+{
+	if (!conn)
+		return 0;
+
+	return ++conn->tid_counter;
+}
+
+
+/**
+ * Get the underlying TCP connection from an RTMP connection
+ *
+ * @param conn RTMP Connection
+ *
+ * @return TCP-Connection
+ */
+struct tcp_conn *rtmp_conn_tcpconn(const struct rtmp_conn *conn)
+{
+	return conn ? conn->tc : NULL;
+}
+
+
+/**
+ * Get the RTMP connection stream name from rtmp_connect
+ *
+ * @param conn RTMP Connection
+ *
+ * @return RTMP Stream name or NULL
+ */
+const char *rtmp_conn_stream(const struct rtmp_conn *conn)
+{
+	return conn ? conn->stream : NULL;
+}
+
+
+/**
+ * Set callback handlers for the RTMP connection
+ *
+ * @param conn   RTMP connection
+ * @param cmdh   Incoming command handler
+ * @param closeh Close handler
+ * @param arg    Handler argument
+ */
+void rtmp_set_handlers(struct rtmp_conn *conn, rtmp_command_h *cmdh,
+		       rtmp_close_h *closeh, void *arg)
+{
+	if (!conn)
+		return;
+
+	conn->cmdh   = cmdh;
+	conn->closeh = closeh;
+	conn->arg    = arg;
+}
+
+
+static const char *rtmp_handshake_name(enum rtmp_handshake_state state)
+{
+	switch (state) {
+
+	case RTMP_STATE_UNINITIALIZED:  return "UNINITIALIZED";
+	case RTMP_STATE_VERSION_SENT:   return "VERSION_SENT";
+	case RTMP_STATE_ACK_SENT:       return "ACK_SENT";
+	case RTMP_STATE_HANDSHAKE_DONE: return "HANDSHAKE_DONE";
+	default: return "?";
+	}
+}
+
+
+int rtmp_conn_debug(struct re_printf *pf, const struct rtmp_conn *conn)
+{
+	int err = 0;
+
+	if (!conn)
+		return 0;
+
+	err |= re_hprintf(pf, "role:          %s\n",
+			  conn->is_client ? "Client" : "Server");
+	err |= re_hprintf(pf, "state:         %s\n",
+			  rtmp_handshake_name(conn->state));
+	err |= re_hprintf(pf, "connected:     %d\n", conn->connected);
+	err |= re_hprintf(pf, "chunk_size:    send=%u\n",
+			  conn->send_chunk_size);
+	err |= re_hprintf(pf, "bytes:         %zu\n", conn->total_bytes);
+	err |= re_hprintf(pf, "streams:       %u\n",
+			  list_count(&conn->streaml));
+
+	if (conn->is_client) {
+		err |= re_hprintf(pf, "uri:           %s\n", conn->uri);
+		err |= re_hprintf(pf, "app:           %s\n", conn->app);
+		err |= re_hprintf(pf, "stream:        %s\n", conn->stream);
+	}
+
+	err |= re_hprintf(pf, "%H\n", rtmp_dechunker_debug, conn->dechunk);
+
+	return err;
+}