blob: df82d781e873ccfb0e745ecad8bd1131cd261ade [file] [log] [blame]
James Kuszmaul82f6c042021-01-17 11:30:16 -08001/**
2 * @file rtmp/dechunk.c Real Time Messaging Protocol (RTMP) -- Dechunking
3 *
4 * Copyright (C) 2010 Creytiv.com
5 */
6#include <string.h>
7#include <re_types.h>
8#include <re_fmt.h>
9#include <re_list.h>
10#include <re_mem.h>
11#include <re_mbuf.h>
12#include <re_net.h>
13#include <re_sa.h>
14#include <re_rtmp.h>
15#include "rtmp.h"
16
17
18enum {
19 MAX_CHUNKS = 64,
20};
21
22
23struct rtmp_chunk {
24 struct le le;
25 struct rtmp_header hdr;
26 struct mbuf *mb;
27};
28
29/** Defines the RTMP Dechunker */
30struct rtmp_dechunker {
31 struct list chunkl; /* struct rtmp_chunk */
32 size_t chunk_sz;
33 rtmp_dechunk_h *chunkh;
34 void *arg;
35};
36
37
38static void destructor(void *data)
39{
40 struct rtmp_dechunker *rd = data;
41
42 list_flush(&rd->chunkl);
43}
44
45
46static void chunk_destructor(void *data)
47{
48 struct rtmp_chunk *chunk = data;
49
50 list_unlink(&chunk->le);
51 mem_deref(chunk->mb);
52}
53
54
55static struct rtmp_chunk *create_chunk(struct list *chunkl,
56 const struct rtmp_header *hdr)
57{
58 struct rtmp_chunk *chunk;
59
60 chunk = mem_zalloc(sizeof(*chunk), chunk_destructor);
61 if (!chunk)
62 return NULL;
63
64 chunk->hdr = *hdr;
65
66 list_append(chunkl, &chunk->le, chunk);
67
68 return chunk;
69}
70
71
72static struct rtmp_chunk *find_chunk(const struct list *chunkl,
73 uint32_t chunk_id)
74{
75 struct le *le;
76
77 for (le = list_head(chunkl); le; le = le->next) {
78
79 struct rtmp_chunk *chunk = le->data;
80
81 if (chunk_id == chunk->hdr.chunk_id)
82 return chunk;
83 }
84
85 return NULL;
86}
87
88
89/*
90 * Stateful RTMP de-chunker for receiving complete messages
91 */
92int rtmp_dechunker_alloc(struct rtmp_dechunker **rdp, size_t chunk_sz,
93 rtmp_dechunk_h *chunkh, void *arg)
94{
95 struct rtmp_dechunker *rd;
96
97 if (!rdp || !chunk_sz || !chunkh)
98 return EINVAL;
99
100 rd = mem_zalloc(sizeof(*rd), destructor);
101 if (!rd)
102 return ENOMEM;
103
104 rd->chunk_sz = chunk_sz;
105
106 rd->chunkh = chunkh;
107 rd->arg = arg;
108
109 *rdp = rd;
110
111 return 0;
112}
113
114
115int rtmp_dechunker_receive(struct rtmp_dechunker *rd, struct mbuf *mb)
116{
117 struct rtmp_header hdr;
118 struct rtmp_chunk *chunk;
119 size_t chunk_sz, left, msg_len;
120 int err;
121
122 if (!rd || !mb)
123 return EINVAL;
124
125 err = rtmp_header_decode(&hdr, mb);
126 if (err)
127 return err;
128
129 /* find preceding chunk, from chunk id */
130 chunk = find_chunk(&rd->chunkl, hdr.chunk_id);
131 if (!chunk) {
132
133 /* only type 0 can create a new chunk stream */
134 if (hdr.format == 0) {
135 if (list_count(&rd->chunkl) > MAX_CHUNKS)
136 return EOVERFLOW;
137
138 chunk = create_chunk(&rd->chunkl, &hdr);
139 if (!chunk)
140 return ENOMEM;
141 }
142 else
143 return ENOENT;
144 }
145
146 switch (hdr.format) {
147
148 case 0:
149 case 1:
150 case 2:
151 if (hdr.format == 0) {
152
153 /* copy the whole header */
154 chunk->hdr = hdr;
155 }
156 else if (hdr.format == 1) {
157
158 chunk->hdr.timestamp_delta = hdr.timestamp_delta;
159 chunk->hdr.length = hdr.length;
160 chunk->hdr.type_id = hdr.type_id;
161 }
162 else if (hdr.format == 2) {
163
164 chunk->hdr.timestamp_delta = hdr.timestamp_delta;
165 }
166
167 msg_len = chunk->hdr.length;
168
169 chunk_sz = min(msg_len, rd->chunk_sz);
170
171 if (mbuf_get_left(mb) < chunk_sz)
172 return ENODATA;
173
174 mem_deref(chunk->mb);
175 chunk->mb = mbuf_alloc(msg_len);
176 if (!chunk->mb)
177 return ENOMEM;
178
179 err = mbuf_read_mem(mb, chunk->mb->buf, chunk_sz);
180 if (err)
181 return err;
182
183 chunk->mb->pos = chunk_sz;
184 chunk->mb->end = chunk_sz;
185
186 chunk->hdr.format = hdr.format;
187 chunk->hdr.ext_ts = hdr.ext_ts;
188
189 if (hdr.format == 1 || hdr.format == 2)
190 chunk->hdr.timestamp += hdr.timestamp_delta;
191 break;
192
193 case 3:
194 if (chunk->hdr.ext_ts) {
195
196 uint32_t ext_ts;
197
198 if (mbuf_get_left(mb) < 4)
199 return ENODATA;
200
201 ext_ts = ntohl(mbuf_read_u32(mb));
202
203 if (chunk->hdr.format == 0)
204 chunk->hdr.timestamp = ext_ts;
205 else
206 chunk->hdr.timestamp_delta = ext_ts;
207 }
208
209 if (!chunk->mb) {
210
211 chunk->mb = mbuf_alloc(chunk->hdr.length);
212 if (!chunk->mb)
213 return ENOMEM;
214
215 if (chunk->hdr.format == 0) {
216 chunk->hdr.timestamp_delta =
217 chunk->hdr.timestamp;
218 }
219
220 chunk->hdr.timestamp += chunk->hdr.timestamp_delta;
221 }
222
223 left = mbuf_get_space(chunk->mb);
224
225 chunk_sz = min(left, rd->chunk_sz);
226
227 if (mbuf_get_left(mb) < chunk_sz)
228 return ENODATA;
229
230 err = mbuf_read_mem(mb, mbuf_buf(chunk->mb), chunk_sz);
231 if (err)
232 return err;
233
234 chunk->mb->pos += chunk_sz;
235 chunk->mb->end += chunk_sz;
236 break;
237
238 default:
239 return EPROTO;
240 }
241
242 if (chunk->mb->pos >= chunk->mb->size) {
243
244 struct mbuf *buf;
245
246 chunk->mb->pos = 0;
247
248 buf = chunk->mb;
249 chunk->mb = NULL;
250
251 err = rd->chunkh(&chunk->hdr, buf, rd->arg);
252
253 mem_deref(buf);
254 }
255
256 return err;
257}
258
259
260void rtmp_dechunker_set_chunksize(struct rtmp_dechunker *rd, size_t chunk_sz)
261{
262 if (!rd || !chunk_sz)
263 return;
264
265 rd->chunk_sz = chunk_sz;
266}
267
268
269int rtmp_dechunker_debug(struct re_printf *pf, const struct rtmp_dechunker *rd)
270{
271 struct le *le;
272 int err;
273
274 if (!rd)
275 return 0;
276
277 err = re_hprintf(pf, "Dechunker Debug:\n");
278
279 err |= re_hprintf(pf, "chunk list: (%u)\n", list_count(&rd->chunkl));
280
281 for (le = rd->chunkl.head; le; le = le->next) {
282
283 const struct rtmp_chunk *msg = le->data;
284
285 err |= re_hprintf(pf, ".. %H\n",
286 rtmp_header_print, &msg->hdr);
287 }
288
289 err |= re_hprintf(pf, "\n");
290
291 return err;
292}