Squashed 'third_party/rawrtc/re/' content from commit f3163ce8b

Change-Id: I6a235e6ac0f03269d951026f9d195da05c40fdab
git-subtree-dir: third_party/rawrtc/re
git-subtree-split: f3163ce8b526a13b35ef71ce4dd6f43585064d8a
diff --git a/src/rtp/sess.c b/src/rtp/sess.c
new file mode 100644
index 0000000..2115210
--- /dev/null
+++ b/src/rtp/sess.c
@@ -0,0 +1,675 @@
+/**
+ * @file rtp/sess.c  Real-time Transport Control Protocol Session
+ *
+ * Copyright (C) 2010 Creytiv.com
+ */
+#ifdef HAVE_SYS_TIME_H
+#include <sys/time.h>
+#endif
+#include <time.h>
+#ifdef WIN32
+#include <winsock2.h>
+#endif
+#include <string.h>
+#include <re_types.h>
+#include <re_fmt.h>
+#include <re_mem.h>
+#include <re_mbuf.h>
+#include <re_list.h>
+#include <re_hash.h>
+#include <re_tmr.h>
+#include <re_sa.h>
+#include <re_lock.h>
+#include <re_rtp.h>
+#include "rtcp.h"
+
+
+#define DEBUG_MODULE "rtcp_sess"
+#define DEBUG_LEVEL 5
+#include <re_dbg.h>
+
+
+/** RTP protocol values */
+enum {
+	RTCP_INTERVAL = 5000,  /**< Interval in [ms] between sending reports */
+	MAX_MEMBERS   = 8,
+};
+
+/** RTP Transmit stats */
+struct txstat {
+	uint32_t psent;      /**< Total number of RTP packets sent */
+	uint32_t osent;      /**< Total number of RTP octets  sent */
+	uint64_t jfs_ref;    /**< Timer ticks at RTP timestamp reference */
+	uint32_t ts_ref;     /**< RTP timestamp reference (transmit)     */
+	bool ts_synced;      /**< RTP timestamp synchronization flag     */
+};
+
+/** RTCP Session */
+struct rtcp_sess {
+	struct rtp_sock *rs;        /**< RTP Socket                          */
+	struct hash *members;       /**< Member table                        */
+	struct tmr tmr;             /**< Event sender timer                  */
+	char *cname;                /**< Canonical Name                      */
+	uint32_t memberc;           /**< Number of members                   */
+	uint32_t senderc;           /**< Number of senders                   */
+	uint32_t srate_tx;          /**< Transmit sampling rate              */
+	uint32_t srate_rx;          /**< Receive sampling rate               */
+
+	/* stats */
+	struct lock *lock;          /**< Lock for txstat                     */
+	struct txstat txstat;       /**< Local transmit statistics           */
+};
+
+
+/* Prototypes */
+static void schedule(struct rtcp_sess *sess);
+static int  send_bye_packet(struct rtcp_sess *sess);
+
+
+static void sess_destructor(void *data)
+{
+	struct rtcp_sess *sess = data;
+
+	if (sess->cname)
+		(void)send_bye_packet(sess);
+
+	tmr_cancel(&sess->tmr);
+
+	mem_deref(sess->cname);
+	hash_flush(sess->members);
+	mem_deref(sess->members);
+	mem_deref(sess->lock);
+}
+
+
+static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src)
+{
+	struct rtp_member *mbr;
+
+	mbr = member_find(sess->members, src);
+	if (mbr)
+		return mbr;
+
+	if (sess->memberc >= MAX_MEMBERS)
+		return NULL;
+
+	mbr = member_add(sess->members, src);
+	if (!mbr)
+		return NULL;
+
+	++sess->memberc;
+
+	return mbr;
+}
+
+
+/** Calculate Round-Trip Time in [microseconds] */
+static void calc_rtt(uint32_t *rtt, uint32_t lsr, uint32_t dlsr)
+{
+	struct ntp_time ntp_time;
+	uint64_t a_us, lsr_us, dlsr_us;
+	int err;
+
+	err = ntp_time_get(&ntp_time);
+	if (err)
+		return;
+
+	a_us    = ntp_compact2us(ntp_compact(&ntp_time));
+	lsr_us  = ntp_compact2us(lsr);
+	dlsr_us = 1000000ULL * dlsr / 65536;
+
+	/* RTT delay is (A - LSR - DLSR) */
+	*rtt = MAX((int)(a_us - lsr_us - dlsr_us), 0);
+}
+
+
+/** Decode Reception Report block */
+static void handle_rr_block(struct rtcp_sess *sess, struct rtp_member *mbr,
+			    const struct rtcp_rr *rr)
+{
+	/* Lost */
+	mbr->cum_lost = rr->lost;
+
+	/* Interarrival jitter */
+	if (sess->srate_tx)
+		mbr->jit = 1000000 * rr->jitter / sess->srate_tx;
+
+	/* round-trip propagation delay as (A - LSR - DLSR) */
+	if (rr->lsr && rr->dlsr)
+		calc_rtt(&mbr->rtt, rr->lsr, rr->dlsr);
+}
+
+
+/** Handle incoming RR (Receiver Report) packet */
+static void handle_incoming_rr(struct rtcp_sess *sess,
+			       const struct rtcp_msg *msg)
+{
+	struct rtp_member *mbr;
+	uint32_t i;
+
+	mbr = get_member(sess, msg->r.rr.ssrc);
+	if (!mbr)
+		return;
+
+	for (i=0; i<msg->hdr.count; i++)
+		handle_rr_block(sess, mbr, &msg->r.rr.rrv[i]);
+}
+
+
+/** Handle incoming SR (Sender Report) packet */
+static void handle_incoming_sr(struct rtcp_sess *sess,
+			       const struct rtcp_msg *msg)
+{
+	struct rtp_member *mbr;
+	uint32_t i;
+
+	mbr = get_member(sess, msg->r.sr.ssrc);
+	if (!mbr) {
+		DEBUG_WARNING("0x%08x: could not add member\n",
+			      msg->r.sr.ssrc);
+		return;
+	}
+
+	if (mbr->s) {
+		/* Save time when SR was received */
+		mbr->s->sr_recv = tmr_jiffies();
+
+		/* Save NTP timestamp from SR */
+		mbr->s->last_sr.hi = msg->r.sr.ntp_sec;
+		mbr->s->last_sr.lo = msg->r.sr.ntp_frac;
+		mbr->s->rtp_ts     = msg->r.sr.rtp_ts;
+		mbr->s->psent      = msg->r.sr.psent;
+		mbr->s->osent      = msg->r.sr.osent;
+	}
+
+	for (i=0; i<msg->hdr.count; i++)
+		handle_rr_block(sess, mbr, &msg->r.sr.rrv[i]);
+}
+
+
+static void handle_incoming_bye(struct rtcp_sess *sess,
+				const struct rtcp_msg *msg)
+{
+	uint32_t i;
+
+	for (i=0; i<msg->hdr.count; i++) {
+
+		struct rtp_member *mbr;
+
+		mbr = member_find(sess->members, msg->r.bye.srcv[i]);
+		if (mbr) {
+			if (mbr->s)
+				--sess->senderc;
+
+			--sess->memberc;
+			mem_deref(mbr);
+		}
+	}
+}
+
+
+void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
+{
+	if (!sess || !msg)
+		return;
+
+	switch (msg->hdr.pt) {
+
+	case RTCP_SR:
+		handle_incoming_sr(sess, msg);
+		break;
+
+	case RTCP_RR:
+		handle_incoming_rr(sess, msg);
+		break;
+
+	case RTCP_BYE:
+		handle_incoming_bye(sess, msg);
+		break;
+
+	default:
+		break;
+	}
+}
+
+
+int rtcp_sess_alloc(struct rtcp_sess **sessp, struct rtp_sock *rs)
+{
+	struct rtcp_sess *sess;
+	int err;
+
+	if (!sessp)
+		return EINVAL;
+
+	sess = mem_zalloc(sizeof(*sess), sess_destructor);
+	if (!sess)
+		return ENOMEM;
+
+	sess->rs = rs;
+	tmr_init(&sess->tmr);
+
+	err = lock_alloc(&sess->lock);
+	if (err)
+		goto out;
+
+	err  = hash_alloc(&sess->members, MAX_MEMBERS);
+	if (err)
+		goto out;
+
+ out:
+	if (err)
+		mem_deref(sess);
+	else
+		*sessp = sess;
+
+	return err;
+}
+
+
+/**
+ * Set the Sampling-rate on an RTCP Session
+ *
+ * @param rs       RTP Socket
+ * @param srate_tx Transmit samplerate
+ * @param srate_rx Receive samplerate
+ */
+void rtcp_set_srate(struct rtp_sock *rs, uint32_t srate_tx, uint32_t srate_rx)
+{
+	struct rtcp_sess *sess = rtp_rtcp_sess(rs);
+	if (!sess)
+		return;
+
+	sess->srate_tx = srate_tx;
+	sess->srate_rx = srate_rx;
+}
+
+
+/**
+ * Set the transmit Sampling-rate on an RTCP Session
+ *
+ * @param rs       RTP Socket
+ * @param srate_tx Transmit samplerate
+ */
+void rtcp_set_srate_tx(struct rtp_sock *rs, uint32_t srate_tx)
+{
+	struct rtcp_sess *sess = rtp_rtcp_sess(rs);
+	if (!sess)
+		return;
+
+	sess->srate_tx = srate_tx;
+}
+
+
+/**
+ * Set the receive Sampling-rate on an RTCP Session
+ *
+ * @param rs       RTP Socket
+ * @param srate_rx Receive samplerate
+ */
+void rtcp_set_srate_rx(struct rtp_sock *rs, uint32_t srate_rx)
+{
+	struct rtcp_sess *sess = rtp_rtcp_sess(rs);
+	if (!sess)
+		return;
+
+	sess->srate_rx = srate_rx;
+}
+
+
+int rtcp_enable(struct rtcp_sess *sess, bool enabled, const char *cname)
+{
+	int err;
+
+	if (!sess)
+		return EINVAL;
+
+	sess->cname = mem_deref(sess->cname);
+	err = str_dup(&sess->cname, cname);
+	if (err)
+		return err;
+
+	if (enabled)
+		schedule(sess);
+	else
+		tmr_cancel(&sess->tmr);
+
+	return 0;
+}
+
+
+/** Calculate LSR (middle 32 bits out of 64 in the NTP timestamp) */
+static uint32_t calc_lsr(const struct ntp_time *last_sr)
+{
+	return last_sr->hi ? ntp_compact(last_sr) : 0;
+}
+
+
+static uint32_t calc_dlsr(uint64_t sr_recv)
+{
+	if (sr_recv) {
+		const uint64_t diff = tmr_jiffies() - sr_recv;
+		return (uint32_t)((65536 * diff) / 1000);
+	}
+	else {
+		return 0;
+	}
+}
+
+
+static bool sender_apply_handler(struct le *le, void *arg)
+{
+	struct rtp_member *mbr = le->data;
+	struct rtp_source *s = mbr->s;
+	struct mbuf *mb = arg;
+	struct rtcp_rr rr;
+
+	if (!s)
+		return false;
+
+	/* Initialise the members */
+	rr.ssrc     = mbr->src;
+	rr.fraction = source_calc_fraction_lost(s);
+	rr.lost     = source_calc_lost(s);
+	rr.last_seq = s->cycles | s->max_seq;
+	rr.jitter   = s->jitter >> 4;
+	rr.lsr      = calc_lsr(&s->last_sr);
+	rr.dlsr     = calc_dlsr(s->sr_recv);
+
+	return 0 != rtcp_rr_encode(mb, &rr);
+}
+
+
+static int encode_handler(struct mbuf *mb, void *arg)
+{
+	struct hash *members = arg;
+
+	/* copy all report blocks */
+	if (hash_apply(members, sender_apply_handler, mb))
+		return ENOMEM;
+
+	return 0;
+}
+
+
+/** Create a Sender Report */
+static int mk_sr(struct rtcp_sess *sess, struct mbuf *mb)
+{
+	struct ntp_time ntp = {0, 0};
+	struct txstat txstat;
+	uint32_t dur, rtp_ts = 0;
+	int err;
+
+	err = ntp_time_get(&ntp);
+	if (err)
+		return err;
+
+	lock_write_get(sess->lock);
+	txstat = sess->txstat;
+	sess->txstat.ts_synced = false;
+	lock_rel(sess->lock);
+
+	if (txstat.jfs_ref) {
+		dur = (uint32_t)(tmr_jiffies() - txstat.jfs_ref);
+		rtp_ts = txstat.ts_ref + dur * sess->srate_tx / 1000;
+	}
+
+	err = rtcp_encode(mb, RTCP_SR, sess->senderc, rtp_sess_ssrc(sess->rs),
+			  ntp.hi, ntp.lo, rtp_ts, txstat.psent, txstat.osent,
+			  encode_handler, sess->members);
+	if (err)
+		return err;
+
+	return err;
+}
+
+
+static int sdes_encode_handler(struct mbuf *mb, void *arg)
+{
+	struct rtcp_sess *sess = arg;
+
+	return rtcp_sdes_encode(mb, rtp_sess_ssrc(sess->rs), 1,
+				RTCP_SDES_CNAME, sess->cname);
+}
+
+
+static int mk_sdes(struct rtcp_sess *sess, struct mbuf *mb)
+{
+	return rtcp_encode(mb, RTCP_SDES, 1, sdes_encode_handler, sess);
+}
+
+
+static int send_rtcp_report(struct rtcp_sess *sess)
+{
+	struct mbuf *mb;
+	int err;
+
+	mb = mbuf_alloc(512);
+	if (!mb)
+		return ENOMEM;
+
+	mb->pos = RTCP_HEADROOM;
+
+	err  = mk_sr(sess, mb);
+	err |= mk_sdes(sess, mb);
+	if (err)
+		goto out;
+
+	mb->pos = RTCP_HEADROOM;
+
+	err = rtcp_send(sess->rs, mb);
+
+ out:
+	mem_deref(mb);
+	return err;
+}
+
+
+static int send_bye_packet(struct rtcp_sess *sess)
+{
+	const uint32_t ssrc = rtp_sess_ssrc(sess->rs);
+	struct mbuf *mb;
+	int err;
+
+	mb = mbuf_alloc(512);
+	if (!mb)
+		return ENOMEM;
+
+	mb->pos = RTCP_HEADROOM;
+
+	err  = rtcp_encode(mb, RTCP_BYE, 1, &ssrc, "Adjo");
+	err |= mk_sdes(sess, mb);
+	if (err)
+		goto out;
+
+	mb->pos = RTCP_HEADROOM;
+
+	err = rtcp_send(sess->rs, mb);
+
+ out:
+	mem_deref(mb);
+	return err;
+}
+
+
+static void timeout(void *arg)
+{
+	struct rtcp_sess *sess = arg;
+	int err;
+
+	err = send_rtcp_report(sess);
+	if (err) {
+		DEBUG_WARNING("Send RTCP report failed: %m\n", err);
+	}
+
+	schedule(sess);
+}
+
+
+static void schedule(struct rtcp_sess *sess)
+{
+	tmr_start(&sess->tmr, RTCP_INTERVAL, timeout, sess);
+}
+
+
+void rtcp_sess_tx_rtp(struct rtcp_sess *sess, uint32_t ts, size_t payload_size)
+{
+	if (!sess)
+		return;
+
+	lock_write_get(sess->lock);
+
+	sess->txstat.osent += (uint32_t)payload_size;
+	sess->txstat.psent += 1;
+
+	if (!sess->txstat.ts_synced) {
+		sess->txstat.jfs_ref   = tmr_jiffies();
+		sess->txstat.ts_ref    = ts;
+		sess->txstat.ts_synced = true;
+	}
+
+	lock_rel(sess->lock);
+}
+
+
+void rtcp_sess_rx_rtp(struct rtcp_sess *sess, uint16_t seq, uint32_t ts,
+		      uint32_t ssrc, size_t payload_size,
+		      const struct sa *peer)
+{
+	struct rtp_member *mbr;
+
+	if (!sess)
+		return;
+
+	mbr = get_member(sess, ssrc);
+	if (!mbr) {
+		DEBUG_NOTICE("could not add member: 0x%08x\n", ssrc);
+		return;
+	}
+
+	if (!mbr->s) {
+		mbr->s = mem_zalloc(sizeof(*mbr->s), NULL);
+		if (!mbr->s) {
+			DEBUG_NOTICE("could not add sender: 0x%08x\n", ssrc);
+			return;
+		}
+
+		/* first packet - init sequence number */
+		source_init_seq(mbr->s, seq);
+		/* probation not used */
+		sa_cpy(&mbr->s->rtp_peer, peer);
+		++sess->senderc;
+	}
+
+	if (!source_update_seq(mbr->s, seq)) {
+		DEBUG_WARNING("rtp_update_seq() returned 0\n");
+	}
+
+	if (sess->srate_rx) {
+
+		uint64_t ts_arrive;
+
+		/* Convert from wall-clock time to timestamp units */
+		ts_arrive = tmr_jiffies() * sess->srate_rx / 1000;
+
+		source_calc_jitter(mbr->s, ts, (uint32_t)ts_arrive);
+	}
+
+	mbr->s->rtp_rx_bytes += payload_size;
+}
+
+
+/**
+ * Get the RTCP Statistics for a source
+ *
+ * @param rs    RTP Socket
+ * @param ssrc  Synchronization source
+ * @param stats RTCP Statistics, set on return
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)
+{
+	const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
+	struct rtp_member *mbr;
+
+	if (!sess || !stats)
+		return EINVAL;
+
+	mbr = member_find(sess->members, ssrc);
+	if (!mbr)
+		return ENOENT;
+
+	lock_read_get(sess->lock);
+	stats->tx.sent = sess->txstat.psent;
+	lock_rel(sess->lock);
+
+	stats->tx.lost = mbr->cum_lost;
+	stats->tx.jit  = mbr->jit;
+
+	stats->rtt = mbr->rtt;
+
+	if (!mbr->s) {
+		memset(&stats->rx, 0, sizeof(stats->rx));
+		return 0;
+	}
+
+	stats->rx.sent = mbr->s->received;
+	stats->rx.lost = source_calc_lost(mbr->s);
+	stats->rx.jit  = sess->srate_rx ?
+		1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0;
+
+	return 0;
+}
+
+
+static bool debug_handler(struct le *le, void *arg)
+{
+	const struct rtp_member *mbr = le->data;
+	struct re_printf *pf = arg;
+	int err;
+
+	err = re_hprintf(pf, "  member 0x%08x: lost=%d Jitter=%.1fms"
+			  " RTT=%.1fms\n", mbr->src, mbr->cum_lost,
+			  (double)mbr->jit/1000, (double)mbr->rtt/1000);
+	if (mbr->s) {
+		err |= re_hprintf(pf,
+				  "                 IP=%J psent=%u rcvd=%u\n",
+				  &mbr->s->rtp_peer, mbr->s->psent,
+				  mbr->s->received);
+	}
+
+	return err != 0;
+}
+
+
+/**
+ * RTCP Debug handler, use with fmt %H
+ *
+ * @param pf Print function
+ * @param rs RTP Socket
+ *
+ * @return 0 if success, otherwise errorcode
+ */
+int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs)
+{
+	const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
+	int err = 0;
+
+	if (!sess)
+		return 0;
+
+	err |= re_hprintf(pf, "----- RTCP Session: -----\n");
+	err |= re_hprintf(pf, "  cname=%s SSRC=0x%08x/%u rx=%uHz\n",
+			  sess->cname,
+			  rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs),
+			  sess->srate_rx);
+
+	hash_apply(sess->members, debug_handler, pf);
+
+	lock_read_get(sess->lock);
+	err |= re_hprintf(pf, "  TX: packets=%u, octets=%u\n",
+			  sess->txstat.psent, sess->txstat.osent);
+	lock_rel(sess->lock);
+
+	return err;
+}