blob: 211521040a3907b06d49eac968c015dba43e3794 [file] [log] [blame]
James Kuszmaul82f6c042021-01-17 11:30:16 -08001/**
2 * @file rtp/sess.c Real-time Transport Control Protocol Session
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6#ifdef HAVE_SYS_TIME_H
7#include <sys/time.h>
8#endif
9#include <time.h>
10#ifdef WIN32
11#include <winsock2.h>
12#endif
13#include <string.h>
14#include <re_types.h>
15#include <re_fmt.h>
16#include <re_mem.h>
17#include <re_mbuf.h>
18#include <re_list.h>
19#include <re_hash.h>
20#include <re_tmr.h>
21#include <re_sa.h>
22#include <re_lock.h>
23#include <re_rtp.h>
24#include "rtcp.h"
25
26
27#define DEBUG_MODULE "rtcp_sess"
28#define DEBUG_LEVEL 5
29#include <re_dbg.h>
30
31
32/** RTP protocol values */
33enum {
34 RTCP_INTERVAL = 5000, /**< Interval in [ms] between sending reports */
35 MAX_MEMBERS = 8,
36};
37
38/** RTP Transmit stats */
39struct txstat {
40 uint32_t psent; /**< Total number of RTP packets sent */
41 uint32_t osent; /**< Total number of RTP octets sent */
42 uint64_t jfs_ref; /**< Timer ticks at RTP timestamp reference */
43 uint32_t ts_ref; /**< RTP timestamp reference (transmit) */
44 bool ts_synced; /**< RTP timestamp synchronization flag */
45};
46
47/** RTCP Session */
48struct rtcp_sess {
49 struct rtp_sock *rs; /**< RTP Socket */
50 struct hash *members; /**< Member table */
51 struct tmr tmr; /**< Event sender timer */
52 char *cname; /**< Canonical Name */
53 uint32_t memberc; /**< Number of members */
54 uint32_t senderc; /**< Number of senders */
55 uint32_t srate_tx; /**< Transmit sampling rate */
56 uint32_t srate_rx; /**< Receive sampling rate */
57
58 /* stats */
59 struct lock *lock; /**< Lock for txstat */
60 struct txstat txstat; /**< Local transmit statistics */
61};
62
63
64/* Prototypes */
65static void schedule(struct rtcp_sess *sess);
66static int send_bye_packet(struct rtcp_sess *sess);
67
68
69static void sess_destructor(void *data)
70{
71 struct rtcp_sess *sess = data;
72
73 if (sess->cname)
74 (void)send_bye_packet(sess);
75
76 tmr_cancel(&sess->tmr);
77
78 mem_deref(sess->cname);
79 hash_flush(sess->members);
80 mem_deref(sess->members);
81 mem_deref(sess->lock);
82}
83
84
85static struct rtp_member *get_member(struct rtcp_sess *sess, uint32_t src)
86{
87 struct rtp_member *mbr;
88
89 mbr = member_find(sess->members, src);
90 if (mbr)
91 return mbr;
92
93 if (sess->memberc >= MAX_MEMBERS)
94 return NULL;
95
96 mbr = member_add(sess->members, src);
97 if (!mbr)
98 return NULL;
99
100 ++sess->memberc;
101
102 return mbr;
103}
104
105
106/** Calculate Round-Trip Time in [microseconds] */
107static void calc_rtt(uint32_t *rtt, uint32_t lsr, uint32_t dlsr)
108{
109 struct ntp_time ntp_time;
110 uint64_t a_us, lsr_us, dlsr_us;
111 int err;
112
113 err = ntp_time_get(&ntp_time);
114 if (err)
115 return;
116
117 a_us = ntp_compact2us(ntp_compact(&ntp_time));
118 lsr_us = ntp_compact2us(lsr);
119 dlsr_us = 1000000ULL * dlsr / 65536;
120
121 /* RTT delay is (A - LSR - DLSR) */
122 *rtt = MAX((int)(a_us - lsr_us - dlsr_us), 0);
123}
124
125
126/** Decode Reception Report block */
127static void handle_rr_block(struct rtcp_sess *sess, struct rtp_member *mbr,
128 const struct rtcp_rr *rr)
129{
130 /* Lost */
131 mbr->cum_lost = rr->lost;
132
133 /* Interarrival jitter */
134 if (sess->srate_tx)
135 mbr->jit = 1000000 * rr->jitter / sess->srate_tx;
136
137 /* round-trip propagation delay as (A - LSR - DLSR) */
138 if (rr->lsr && rr->dlsr)
139 calc_rtt(&mbr->rtt, rr->lsr, rr->dlsr);
140}
141
142
143/** Handle incoming RR (Receiver Report) packet */
144static void handle_incoming_rr(struct rtcp_sess *sess,
145 const struct rtcp_msg *msg)
146{
147 struct rtp_member *mbr;
148 uint32_t i;
149
150 mbr = get_member(sess, msg->r.rr.ssrc);
151 if (!mbr)
152 return;
153
154 for (i=0; i<msg->hdr.count; i++)
155 handle_rr_block(sess, mbr, &msg->r.rr.rrv[i]);
156}
157
158
159/** Handle incoming SR (Sender Report) packet */
160static void handle_incoming_sr(struct rtcp_sess *sess,
161 const struct rtcp_msg *msg)
162{
163 struct rtp_member *mbr;
164 uint32_t i;
165
166 mbr = get_member(sess, msg->r.sr.ssrc);
167 if (!mbr) {
168 DEBUG_WARNING("0x%08x: could not add member\n",
169 msg->r.sr.ssrc);
170 return;
171 }
172
173 if (mbr->s) {
174 /* Save time when SR was received */
175 mbr->s->sr_recv = tmr_jiffies();
176
177 /* Save NTP timestamp from SR */
178 mbr->s->last_sr.hi = msg->r.sr.ntp_sec;
179 mbr->s->last_sr.lo = msg->r.sr.ntp_frac;
180 mbr->s->rtp_ts = msg->r.sr.rtp_ts;
181 mbr->s->psent = msg->r.sr.psent;
182 mbr->s->osent = msg->r.sr.osent;
183 }
184
185 for (i=0; i<msg->hdr.count; i++)
186 handle_rr_block(sess, mbr, &msg->r.sr.rrv[i]);
187}
188
189
190static void handle_incoming_bye(struct rtcp_sess *sess,
191 const struct rtcp_msg *msg)
192{
193 uint32_t i;
194
195 for (i=0; i<msg->hdr.count; i++) {
196
197 struct rtp_member *mbr;
198
199 mbr = member_find(sess->members, msg->r.bye.srcv[i]);
200 if (mbr) {
201 if (mbr->s)
202 --sess->senderc;
203
204 --sess->memberc;
205 mem_deref(mbr);
206 }
207 }
208}
209
210
211void rtcp_handler(struct rtcp_sess *sess, struct rtcp_msg *msg)
212{
213 if (!sess || !msg)
214 return;
215
216 switch (msg->hdr.pt) {
217
218 case RTCP_SR:
219 handle_incoming_sr(sess, msg);
220 break;
221
222 case RTCP_RR:
223 handle_incoming_rr(sess, msg);
224 break;
225
226 case RTCP_BYE:
227 handle_incoming_bye(sess, msg);
228 break;
229
230 default:
231 break;
232 }
233}
234
235
236int rtcp_sess_alloc(struct rtcp_sess **sessp, struct rtp_sock *rs)
237{
238 struct rtcp_sess *sess;
239 int err;
240
241 if (!sessp)
242 return EINVAL;
243
244 sess = mem_zalloc(sizeof(*sess), sess_destructor);
245 if (!sess)
246 return ENOMEM;
247
248 sess->rs = rs;
249 tmr_init(&sess->tmr);
250
251 err = lock_alloc(&sess->lock);
252 if (err)
253 goto out;
254
255 err = hash_alloc(&sess->members, MAX_MEMBERS);
256 if (err)
257 goto out;
258
259 out:
260 if (err)
261 mem_deref(sess);
262 else
263 *sessp = sess;
264
265 return err;
266}
267
268
269/**
270 * Set the Sampling-rate on an RTCP Session
271 *
272 * @param rs RTP Socket
273 * @param srate_tx Transmit samplerate
274 * @param srate_rx Receive samplerate
275 */
276void rtcp_set_srate(struct rtp_sock *rs, uint32_t srate_tx, uint32_t srate_rx)
277{
278 struct rtcp_sess *sess = rtp_rtcp_sess(rs);
279 if (!sess)
280 return;
281
282 sess->srate_tx = srate_tx;
283 sess->srate_rx = srate_rx;
284}
285
286
287/**
288 * Set the transmit Sampling-rate on an RTCP Session
289 *
290 * @param rs RTP Socket
291 * @param srate_tx Transmit samplerate
292 */
293void rtcp_set_srate_tx(struct rtp_sock *rs, uint32_t srate_tx)
294{
295 struct rtcp_sess *sess = rtp_rtcp_sess(rs);
296 if (!sess)
297 return;
298
299 sess->srate_tx = srate_tx;
300}
301
302
303/**
304 * Set the receive Sampling-rate on an RTCP Session
305 *
306 * @param rs RTP Socket
307 * @param srate_rx Receive samplerate
308 */
309void rtcp_set_srate_rx(struct rtp_sock *rs, uint32_t srate_rx)
310{
311 struct rtcp_sess *sess = rtp_rtcp_sess(rs);
312 if (!sess)
313 return;
314
315 sess->srate_rx = srate_rx;
316}
317
318
319int rtcp_enable(struct rtcp_sess *sess, bool enabled, const char *cname)
320{
321 int err;
322
323 if (!sess)
324 return EINVAL;
325
326 sess->cname = mem_deref(sess->cname);
327 err = str_dup(&sess->cname, cname);
328 if (err)
329 return err;
330
331 if (enabled)
332 schedule(sess);
333 else
334 tmr_cancel(&sess->tmr);
335
336 return 0;
337}
338
339
340/** Calculate LSR (middle 32 bits out of 64 in the NTP timestamp) */
341static uint32_t calc_lsr(const struct ntp_time *last_sr)
342{
343 return last_sr->hi ? ntp_compact(last_sr) : 0;
344}
345
346
347static uint32_t calc_dlsr(uint64_t sr_recv)
348{
349 if (sr_recv) {
350 const uint64_t diff = tmr_jiffies() - sr_recv;
351 return (uint32_t)((65536 * diff) / 1000);
352 }
353 else {
354 return 0;
355 }
356}
357
358
359static bool sender_apply_handler(struct le *le, void *arg)
360{
361 struct rtp_member *mbr = le->data;
362 struct rtp_source *s = mbr->s;
363 struct mbuf *mb = arg;
364 struct rtcp_rr rr;
365
366 if (!s)
367 return false;
368
369 /* Initialise the members */
370 rr.ssrc = mbr->src;
371 rr.fraction = source_calc_fraction_lost(s);
372 rr.lost = source_calc_lost(s);
373 rr.last_seq = s->cycles | s->max_seq;
374 rr.jitter = s->jitter >> 4;
375 rr.lsr = calc_lsr(&s->last_sr);
376 rr.dlsr = calc_dlsr(s->sr_recv);
377
378 return 0 != rtcp_rr_encode(mb, &rr);
379}
380
381
382static int encode_handler(struct mbuf *mb, void *arg)
383{
384 struct hash *members = arg;
385
386 /* copy all report blocks */
387 if (hash_apply(members, sender_apply_handler, mb))
388 return ENOMEM;
389
390 return 0;
391}
392
393
394/** Create a Sender Report */
395static int mk_sr(struct rtcp_sess *sess, struct mbuf *mb)
396{
397 struct ntp_time ntp = {0, 0};
398 struct txstat txstat;
399 uint32_t dur, rtp_ts = 0;
400 int err;
401
402 err = ntp_time_get(&ntp);
403 if (err)
404 return err;
405
406 lock_write_get(sess->lock);
407 txstat = sess->txstat;
408 sess->txstat.ts_synced = false;
409 lock_rel(sess->lock);
410
411 if (txstat.jfs_ref) {
412 dur = (uint32_t)(tmr_jiffies() - txstat.jfs_ref);
413 rtp_ts = txstat.ts_ref + dur * sess->srate_tx / 1000;
414 }
415
416 err = rtcp_encode(mb, RTCP_SR, sess->senderc, rtp_sess_ssrc(sess->rs),
417 ntp.hi, ntp.lo, rtp_ts, txstat.psent, txstat.osent,
418 encode_handler, sess->members);
419 if (err)
420 return err;
421
422 return err;
423}
424
425
426static int sdes_encode_handler(struct mbuf *mb, void *arg)
427{
428 struct rtcp_sess *sess = arg;
429
430 return rtcp_sdes_encode(mb, rtp_sess_ssrc(sess->rs), 1,
431 RTCP_SDES_CNAME, sess->cname);
432}
433
434
435static int mk_sdes(struct rtcp_sess *sess, struct mbuf *mb)
436{
437 return rtcp_encode(mb, RTCP_SDES, 1, sdes_encode_handler, sess);
438}
439
440
441static int send_rtcp_report(struct rtcp_sess *sess)
442{
443 struct mbuf *mb;
444 int err;
445
446 mb = mbuf_alloc(512);
447 if (!mb)
448 return ENOMEM;
449
450 mb->pos = RTCP_HEADROOM;
451
452 err = mk_sr(sess, mb);
453 err |= mk_sdes(sess, mb);
454 if (err)
455 goto out;
456
457 mb->pos = RTCP_HEADROOM;
458
459 err = rtcp_send(sess->rs, mb);
460
461 out:
462 mem_deref(mb);
463 return err;
464}
465
466
467static int send_bye_packet(struct rtcp_sess *sess)
468{
469 const uint32_t ssrc = rtp_sess_ssrc(sess->rs);
470 struct mbuf *mb;
471 int err;
472
473 mb = mbuf_alloc(512);
474 if (!mb)
475 return ENOMEM;
476
477 mb->pos = RTCP_HEADROOM;
478
479 err = rtcp_encode(mb, RTCP_BYE, 1, &ssrc, "Adjo");
480 err |= mk_sdes(sess, mb);
481 if (err)
482 goto out;
483
484 mb->pos = RTCP_HEADROOM;
485
486 err = rtcp_send(sess->rs, mb);
487
488 out:
489 mem_deref(mb);
490 return err;
491}
492
493
494static void timeout(void *arg)
495{
496 struct rtcp_sess *sess = arg;
497 int err;
498
499 err = send_rtcp_report(sess);
500 if (err) {
501 DEBUG_WARNING("Send RTCP report failed: %m\n", err);
502 }
503
504 schedule(sess);
505}
506
507
508static void schedule(struct rtcp_sess *sess)
509{
510 tmr_start(&sess->tmr, RTCP_INTERVAL, timeout, sess);
511}
512
513
514void rtcp_sess_tx_rtp(struct rtcp_sess *sess, uint32_t ts, size_t payload_size)
515{
516 if (!sess)
517 return;
518
519 lock_write_get(sess->lock);
520
521 sess->txstat.osent += (uint32_t)payload_size;
522 sess->txstat.psent += 1;
523
524 if (!sess->txstat.ts_synced) {
525 sess->txstat.jfs_ref = tmr_jiffies();
526 sess->txstat.ts_ref = ts;
527 sess->txstat.ts_synced = true;
528 }
529
530 lock_rel(sess->lock);
531}
532
533
534void rtcp_sess_rx_rtp(struct rtcp_sess *sess, uint16_t seq, uint32_t ts,
535 uint32_t ssrc, size_t payload_size,
536 const struct sa *peer)
537{
538 struct rtp_member *mbr;
539
540 if (!sess)
541 return;
542
543 mbr = get_member(sess, ssrc);
544 if (!mbr) {
545 DEBUG_NOTICE("could not add member: 0x%08x\n", ssrc);
546 return;
547 }
548
549 if (!mbr->s) {
550 mbr->s = mem_zalloc(sizeof(*mbr->s), NULL);
551 if (!mbr->s) {
552 DEBUG_NOTICE("could not add sender: 0x%08x\n", ssrc);
553 return;
554 }
555
556 /* first packet - init sequence number */
557 source_init_seq(mbr->s, seq);
558 /* probation not used */
559 sa_cpy(&mbr->s->rtp_peer, peer);
560 ++sess->senderc;
561 }
562
563 if (!source_update_seq(mbr->s, seq)) {
564 DEBUG_WARNING("rtp_update_seq() returned 0\n");
565 }
566
567 if (sess->srate_rx) {
568
569 uint64_t ts_arrive;
570
571 /* Convert from wall-clock time to timestamp units */
572 ts_arrive = tmr_jiffies() * sess->srate_rx / 1000;
573
574 source_calc_jitter(mbr->s, ts, (uint32_t)ts_arrive);
575 }
576
577 mbr->s->rtp_rx_bytes += payload_size;
578}
579
580
581/**
582 * Get the RTCP Statistics for a source
583 *
584 * @param rs RTP Socket
585 * @param ssrc Synchronization source
586 * @param stats RTCP Statistics, set on return
587 *
588 * @return 0 if success, otherwise errorcode
589 */
590int rtcp_stats(struct rtp_sock *rs, uint32_t ssrc, struct rtcp_stats *stats)
591{
592 const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
593 struct rtp_member *mbr;
594
595 if (!sess || !stats)
596 return EINVAL;
597
598 mbr = member_find(sess->members, ssrc);
599 if (!mbr)
600 return ENOENT;
601
602 lock_read_get(sess->lock);
603 stats->tx.sent = sess->txstat.psent;
604 lock_rel(sess->lock);
605
606 stats->tx.lost = mbr->cum_lost;
607 stats->tx.jit = mbr->jit;
608
609 stats->rtt = mbr->rtt;
610
611 if (!mbr->s) {
612 memset(&stats->rx, 0, sizeof(stats->rx));
613 return 0;
614 }
615
616 stats->rx.sent = mbr->s->received;
617 stats->rx.lost = source_calc_lost(mbr->s);
618 stats->rx.jit = sess->srate_rx ?
619 1000000 * (mbr->s->jitter>>4) / sess->srate_rx : 0;
620
621 return 0;
622}
623
624
625static bool debug_handler(struct le *le, void *arg)
626{
627 const struct rtp_member *mbr = le->data;
628 struct re_printf *pf = arg;
629 int err;
630
631 err = re_hprintf(pf, " member 0x%08x: lost=%d Jitter=%.1fms"
632 " RTT=%.1fms\n", mbr->src, mbr->cum_lost,
633 (double)mbr->jit/1000, (double)mbr->rtt/1000);
634 if (mbr->s) {
635 err |= re_hprintf(pf,
636 " IP=%J psent=%u rcvd=%u\n",
637 &mbr->s->rtp_peer, mbr->s->psent,
638 mbr->s->received);
639 }
640
641 return err != 0;
642}
643
644
645/**
646 * RTCP Debug handler, use with fmt %H
647 *
648 * @param pf Print function
649 * @param rs RTP Socket
650 *
651 * @return 0 if success, otherwise errorcode
652 */
653int rtcp_debug(struct re_printf *pf, const struct rtp_sock *rs)
654{
655 const struct rtcp_sess *sess = rtp_rtcp_sess(rs);
656 int err = 0;
657
658 if (!sess)
659 return 0;
660
661 err |= re_hprintf(pf, "----- RTCP Session: -----\n");
662 err |= re_hprintf(pf, " cname=%s SSRC=0x%08x/%u rx=%uHz\n",
663 sess->cname,
664 rtp_sess_ssrc(sess->rs), rtp_sess_ssrc(sess->rs),
665 sess->srate_rx);
666
667 hash_apply(sess->members, debug_handler, pf);
668
669 lock_read_get(sess->lock);
670 err |= re_hprintf(pf, " TX: packets=%u, octets=%u\n",
671 sess->txstat.psent, sess->txstat.osent);
672 lock_rel(sess->lock);
673
674 return err;
675}