From 74506f1048d4782de284675bddd81c7eff0c87cb Mon Sep 17 00:00:00 2001 From: "Alfred E. Heggestad" Date: Sun, 1 Mar 2015 13:59:13 +0100 Subject: video: add video-queue for pacing outgoing RTP packets --- src/video.c | 184 ++++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 167 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/video.c b/src/video.c index 9c1ea80..9829115 100644 --- a/src/video.c +++ b/src/video.c @@ -29,6 +29,14 @@ enum { MAX_MUTED_FRAMES = 3, }; +/** Video transmit parameters */ +enum { + MEDIA_POLL_RATE = 250, /**< in [Hz] */ + BURST_MAX = 8192, /**< in bytes */ + RTP_PRESZ = 4 + RTP_HEADER_SIZE, /**< TURN and RTP header */ + RTP_TRAILSZ = 12 + 4, /**< SRTP/SRTCP trailer */ +}; + /** * \page GenericVideoStream Generic Video Stream @@ -83,7 +91,10 @@ struct vtx { struct lock *lock; /**< Lock for encoder */ struct vidframe *frame; /**< Source frame */ struct vidframe *mute_frame; /**< Frame with muted video */ - struct mbuf *mb; /**< Packetization buffer */ + struct lock *lock_tx; /**< Protect the sendq */ + struct list sendq; /**< Tx-Queue (struct vidqent) */ + struct tmr tmr_rtp; /**< Timer for sending RTP */ + unsigned skipc; /**< Number of frames skipped */ struct list filtl; /**< Filters in encoding order */ char device[64]; int muted_frames; /**< # of muted frames sent */ @@ -141,6 +152,128 @@ struct video { }; +struct vidqent { + struct le le; + struct sa dst; + bool marker; + uint8_t pt; + uint32_t ts; + struct mbuf *mb; +}; + + +static void vidqent_destructor(void *arg) +{ + struct vidqent *qent = arg; + + list_unlink(&qent->le); + mem_deref(qent->mb); +} + + +static int vidqent_alloc(struct vidqent **qentp, + bool marker, uint8_t pt, uint32_t ts, + const uint8_t *hdr, size_t hdr_len, + const uint8_t *pld, size_t pld_len) +{ + struct vidqent *qent; + int err = 0; + + if (!qentp || !pld) + return EINVAL; + + qent = mem_zalloc(sizeof(*qent), vidqent_destructor); + if (!qent) + return ENOMEM; + + qent->marker = marker; + qent->pt = pt; + qent->ts = ts; + + qent->mb = mbuf_alloc(RTP_PRESZ + hdr_len + pld_len + RTP_TRAILSZ); + if (!qent->mb) { + err = ENOMEM; + goto out; + } + + qent->mb->pos = qent->mb->end = RTP_PRESZ; + + if (hdr) + (void)mbuf_write_mem(qent->mb, hdr, hdr_len); + + (void)mbuf_write_mem(qent->mb, pld, pld_len); + + qent->mb->pos = RTP_PRESZ; + + out: + if (err) + mem_deref(qent); + else + *qentp = qent; + + return err; +} + + +static void vidqueue_poll(struct vtx *vtx, uint64_t jfs, uint64_t prev_jfs) +{ + size_t burst, sent; + uint64_t bandwidth_kbps; + struct le *le; + + if (!vtx) + return; + + lock_write_get(vtx->lock_tx); + + le = vtx->sendq.head; + if (!le) + goto out; + + /* + * time [ms] * bitrate [kbps] / 8 = bytes + */ + bandwidth_kbps = vtx->video->cfg.bitrate / 1000; + burst = (1 + jfs - prev_jfs) * bandwidth_kbps / 4; + + burst = min(burst, BURST_MAX); + sent = 0; + + while (le) { + + struct vidqent *qent = le->data; + + sent += mbuf_get_left(qent->mb); + + stream_send(vtx->video->strm, qent->marker, qent->pt, + qent->ts, qent->mb); + + le = le->next; + mem_deref(qent); + + if (sent > burst) { + break; + } + } + + out: + lock_rel(vtx->lock_tx); +} + + +static void rtp_tmr_handler(void *arg) +{ + struct vtx *vtx = arg; + uint64_t pjfs; + + pjfs = vtx->tmr_rtp.jfs; + + tmr_start(&vtx->tmr_rtp, 1000/MEDIA_POLL_RATE, rtp_tmr_handler, vtx); + + vidqueue_poll(vtx, vtx->tmr_rtp.jfs, pjfs); +} + + static void video_destructor(void *arg) { struct video *v = arg; @@ -148,12 +281,17 @@ static void video_destructor(void *arg) struct vrx *vrx = &v->vrx; /* transmit */ + lock_write_get(vtx->lock_tx); + list_flush(&vtx->sendq); + lock_rel(vtx->lock_tx); + mem_deref(vtx->lock_tx); + + tmr_cancel(&vtx->tmr_rtp); mem_deref(vtx->vsrc); lock_write_get(vtx->lock); mem_deref(vtx->frame); mem_deref(vtx->mute_frame); mem_deref(vtx->enc); - mem_deref(vtx->mb); list_flush(&vtx->filtl); lock_rel(vtx->lock); mem_deref(vtx->lock); @@ -191,20 +329,20 @@ static int get_fps(const struct video *v) static int packet_handler(bool marker, const uint8_t *hdr, size_t hdr_len, const uint8_t *pld, size_t pld_len, void *arg) { - struct vtx *tx = arg; - int err = 0; - - tx->mb->pos = tx->mb->end = STREAM_PRESZ; - - if (hdr_len) err |= mbuf_write_mem(tx->mb, hdr, hdr_len); - if (pld_len) err |= mbuf_write_mem(tx->mb, pld, pld_len); + struct vtx *vtx = arg; + struct stream *strm = vtx->video->strm; + struct vidqent *qent; + int err; - tx->mb->pos = STREAM_PRESZ; + err = vidqent_alloc(&qent, marker, strm->pt_enc, vtx->ts_tx, + hdr, hdr_len, pld, pld_len); + if (err) + return err; - if (!err) { - err = stream_send(tx->video->strm, marker, -1, - tx->ts_tx, tx->mb); - } + lock_write_get(vtx->lock_tx); + qent->dst = *sdp_media_raddr(strm->sdp); + list_append(&vtx->sendq, &qent->le, qent); + lock_rel(vtx->lock_tx); return err; } @@ -222,10 +360,20 @@ static void encode_rtp_send(struct vtx *vtx, struct vidframe *frame) { struct le *le; int err = 0; + bool sendq_empty; if (!vtx->enc) return; + lock_write_get(vtx->lock_tx); + sendq_empty = (vtx->sendq.head == NULL); + lock_rel(vtx->lock_tx); + + if (!sendq_empty) { + ++vtx->skipc; + return; + } + lock_write_get(vtx->lock); /* Convert image */ @@ -312,18 +460,19 @@ static int vtx_alloc(struct vtx *vtx, struct video *video) int err; err = lock_alloc(&vtx->lock); + err |= lock_alloc(&vtx->lock_tx); if (err) return err; - vtx->mb = mbuf_alloc(STREAM_PRESZ + 512); - if (!vtx->mb) - return ENOMEM; + tmr_init(&vtx->tmr_rtp); vtx->video = video; vtx->ts_tx = 160; str_ncpy(vtx->device, video->cfg.src_dev, sizeof(vtx->device)); + tmr_start(&vtx->tmr_rtp, 1, rtp_tmr_handler, vtx); + return err; } @@ -1031,6 +1180,7 @@ int video_debug(struct re_printf *pf, const struct video *v) err |= re_hprintf(pf, " tx: %u x %u, fps=%d\n", vtx->vsrc_size.w, vtx->vsrc_size.h, vtx->vsrc_prm.fps); + err |= re_hprintf(pf, " skipc=%u\n", vtx->skipc); err |= re_hprintf(pf, " rx: pt=%d\n", vrx->pt_rx); if (!list_isempty(vidfilt_list())) { -- cgit v1.2.3