diff options
author | Alfred E. Heggestad <aeh@db.org> | 2014-02-09 11:50:07 +0100 |
---|---|---|
committer | Alfred E. Heggestad <aeh@db.org> | 2014-02-09 11:50:07 +0100 |
commit | 98bf08bdcf2edd9d397f32650a8bfe62186fbecf (patch) | |
tree | ebc6ec71f44bff8c42e4eefced61948623df02fc /src/stream.c | |
parent | e6ad5cf4401b860ba402d4b7b3c7c254bc87a019 (diff) |
baresip 0.4.10
Diffstat (limited to 'src/stream.c')
-rw-r--r-- | src/stream.c | 582 |
1 files changed, 582 insertions, 0 deletions
diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 0000000..0e5b89c --- /dev/null +++ b/src/stream.c @@ -0,0 +1,582 @@ +/** + * @file stream.c Generic Media Stream + * + * Copyright (C) 2010 Creytiv.com + */ +#include <string.h> +#include <time.h> +#include <re.h> +#include <baresip.h> +#include "core.h" + + +#define MAGIC 0x00814ea5 +#include "magic.h" + + +enum { + RTP_RECV_SIZE = 8192, +}; + + +/** Defines a generic media stream */ +struct stream { + MAGIC_DECL + + struct le le; /**< Linked list element */ + struct config_avt cfg; /**< Stream configuration */ + struct call *call; /**< Ref. to call object */ + struct sdp_media *sdp; /**< SDP Media line */ + struct rtp_sock *rtp; /**< RTP Socket */ + struct rtpkeep *rtpkeep; /**< RTP Keepalive */ + struct rtcp_stats rtcp_stats;/**< RTCP statistics */ + struct jbuf *jbuf; /**< Jitter Buffer for incoming RTP */ + struct mnat_media *mns; /**< Media NAT traversal state */ + const struct menc *menc; /**< Media encryption module */ + struct menc_sess *mencs; /**< Media encryption session state */ + struct menc_media *mes; /**< Media Encryption media state */ + struct metric metric_tx; /**< Metrics for transmit */ + struct metric metric_rx; /**< Metrics for receiving */ + char *cname; + uint32_t ssrc_rx; /**< Incoming syncronizing source */ + uint32_t pseq; /**< Sequence number for incoming RTP */ + int pt_enc; /**< Payload type for encoding */ + bool rtcp; /**< Enable RTCP */ + bool rtcp_mux; /**< RTP/RTCP multiplex supported by peer */ + bool jbuf_started; /**< True if jitter-buffer was started */ + stream_rtp_h *rtph; /**< Stream RTP handler */ + stream_rtcp_h *rtcph; /**< Stream RTCP handler */ + void *arg; /**< Handler argument */ +}; + + +static inline int lostcalc(struct stream *s, uint16_t seq) +{ + const uint16_t delta = seq - s->pseq; + int lostc; + + if (s->pseq == (uint32_t)-1) + lostc = 0; + else if (delta == 0) + return -1; + else if (delta < 3000) + lostc = delta - 1; + else if (delta < 0xff9c) + lostc = 0; + else + return -2; + + s->pseq = seq; + + return lostc; +} + + +static void print_rtp_stats(struct stream *s) +{ + info("\n%-9s Transmit: Receive:\n" + "packets: %7u %7u\n" + "avg. bitrate: %7.1f %7.1f (kbit/s)\n", + sdp_media_name(s->sdp), + s->metric_tx.n_packets, s->metric_rx.n_packets, + 1.0*metric_avg_bitrate(&s->metric_tx)/1000, + 1.0*metric_avg_bitrate(&s->metric_rx)/1000); + + if (s->rtcp_stats.tx.sent || s->rtcp_stats.rx.sent) { + + info("pkt.report: %7u %7u\n" + "lost: %7d %7d\n" + "jitter: %7.1f %7.1f (ms)\n", + s->rtcp_stats.tx.sent, s->rtcp_stats.rx.sent, + s->rtcp_stats.tx.lost, s->rtcp_stats.rx.lost, + 1.0*s->rtcp_stats.tx.jit/1000, + 1.0*s->rtcp_stats.rx.jit/1000); + } +} + + +static void stream_destructor(void *arg) +{ + struct stream *s = arg; + + if (s->cfg.rtp_stats) + print_rtp_stats(s); + + metric_reset(&s->metric_tx); + metric_reset(&s->metric_rx); + + list_unlink(&s->le); + mem_deref(s->rtpkeep); + mem_deref(s->sdp); + mem_deref(s->mes); + mem_deref(s->mencs); + mem_deref(s->mns); + mem_deref(s->jbuf); + mem_deref(s->rtp); + mem_deref(s->cname); +} + + +static void rtp_recv(const struct sa *src, const struct rtp_header *hdr, + struct mbuf *mb, void *arg) +{ + struct stream *s = arg; + bool flush = false; + int err; + + if (!mbuf_get_left(mb)) + return; + + if (!(sdp_media_ldir(s->sdp) & SDP_RECVONLY)) + return; + + metric_add_packet(&s->metric_rx, mbuf_get_left(mb)); + + if (hdr->ssrc != s->ssrc_rx) { + if (s->ssrc_rx) { + flush = true; + info("stream: %s: SSRC changed %x -> %x" + " (%u bytes from %J)\n", + sdp_media_name(s->sdp), s->ssrc_rx, hdr->ssrc, + mbuf_get_left(mb), src); + } + s->ssrc_rx = hdr->ssrc; + } + + if (s->jbuf) { + + struct rtp_header hdr2; + void *mb2 = NULL; + + /* Put frame in Jitter Buffer */ + if (flush) + jbuf_flush(s->jbuf); + + err = jbuf_put(s->jbuf, hdr, mb); + if (err) { + info("%s: dropping %u bytes from %J (%m)\n", + sdp_media_name(s->sdp), mb->end, + src, err); + s->metric_rx.n_err++; + } + + if (jbuf_get(s->jbuf, &hdr2, &mb2)) { + + if (!s->jbuf_started) + return; + + memset(&hdr2, 0, sizeof(hdr2)); + } + + s->jbuf_started = true; + + if (lostcalc(s, hdr2.seq) > 0) + s->rtph(hdr, NULL, s->arg); + + s->rtph(&hdr2, mb2, s->arg); + + mem_deref(mb2); + } + else { + if (lostcalc(s, hdr->seq) > 0) + s->rtph(hdr, NULL, s->arg); + + s->rtph(hdr, mb, s->arg); + } +} + + +static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg) +{ + struct stream *s = arg; + (void)src; + + if (s->rtcph) + s->rtcph(msg, s->arg); + + switch (msg->hdr.pt) { + + case RTCP_SR: + (void)rtcp_stats(s->rtp, msg->r.sr.ssrc, &s->rtcp_stats); + break; + } +} + + +static int stream_sock_alloc(struct stream *s, int af) +{ + struct sa laddr; + int tos, err; + + if (!s) + return EINVAL; + + /* we listen on all interfaces */ + sa_init(&laddr, af); + + err = rtp_listen(&s->rtp, IPPROTO_UDP, &laddr, + s->cfg.rtp_ports.min, s->cfg.rtp_ports.max, + s->rtcp, rtp_recv, rtcp_handler, s); + if (err) + return err; + + tos = s->cfg.rtp_tos; + (void)udp_setsockopt(rtp_sock(s->rtp), IPPROTO_IP, IP_TOS, + &tos, sizeof(tos)); + (void)udp_setsockopt(rtcp_sock(s->rtp), IPPROTO_IP, IP_TOS, + &tos, sizeof(tos)); + + udp_rxsz_set(rtp_sock(s->rtp), RTP_RECV_SIZE); + + return 0; +} + + +int stream_alloc(struct stream **sp, const struct config_avt *cfg, + struct call *call, struct sdp_session *sdp_sess, + const char *name, int label, + const struct mnat *mnat, struct mnat_sess *mnat_sess, + const struct menc *menc, struct menc_sess *menc_sess, + const char *cname, + stream_rtp_h *rtph, stream_rtcp_h *rtcph, void *arg) +{ + struct stream *s; + int err; + + if (!sp || !cfg || !call || !rtph) + return EINVAL; + + s = mem_zalloc(sizeof(*s), stream_destructor); + if (!s) + return ENOMEM; + + MAGIC_INIT(s); + + s->cfg = *cfg; + s->call = call; + s->rtph = rtph; + s->rtcph = rtcph; + s->arg = arg; + s->pseq = -1; + s->rtcp = s->cfg.rtcp_enable; + + err = stream_sock_alloc(s, call_af(call)); + if (err) + goto out; + + err = str_dup(&s->cname, cname); + if (err) + goto out; + + /* Jitter buffer */ + if (cfg->jbuf_del.min && cfg->jbuf_del.max) { + + err = jbuf_alloc(&s->jbuf, cfg->jbuf_del.min, + cfg->jbuf_del.max); + if (err) + goto out; + } + + err = sdp_media_add(&s->sdp, sdp_sess, name, + sa_port(rtp_local(s->rtp)), + (menc && menc->sdp_proto) ? menc->sdp_proto : + sdp_proto_rtpavp); + if (err) + goto out; + + if (label) { + err |= sdp_media_set_lattr(s->sdp, true, + "label", "%d", label); + } + + /* RFC 5506 */ + if (s->rtcp) + err |= sdp_media_set_lattr(s->sdp, true, "rtcp-rsize", NULL); + + /* RFC 5576 */ + if (s->rtcp) { + err |= sdp_media_set_lattr(s->sdp, true, + "ssrc", "%u cname:%s", + rtp_sess_ssrc(s->rtp), cname); + } + + /* RFC 5761 */ + if (cfg->rtcp_mux) + err |= sdp_media_set_lattr(s->sdp, true, "rtcp-mux", NULL); + + if (err) + goto out; + + if (mnat) { + err = mnat->mediah(&s->mns, mnat_sess, IPPROTO_UDP, + rtp_sock(s->rtp), + s->rtcp ? rtcp_sock(s->rtp) : NULL, + s->sdp); + if (err) + goto out; + } + + if (menc) { + s->menc = menc; + s->mencs = mem_ref(menc_sess); + err = menc->mediah(&s->mes, menc_sess, + s->rtp, + IPPROTO_UDP, + rtp_sock(s->rtp), + s->rtcp ? rtcp_sock(s->rtp) : NULL, + s->sdp); + if (err) + goto out; + } + + if (err) + goto out; + + s->pt_enc = -1; + + list_append(call_streaml(call), &s->le, s); + + out: + if (err) + mem_deref(s); + else + *sp = s; + + return err; +} + + +struct sdp_media *stream_sdpmedia(const struct stream *s) +{ + return s ? s->sdp : NULL; +} + + +static void stream_start_keepalive(struct stream *s) +{ + const char *rtpkeep; + + if (!s) + return; + + rtpkeep = ua_prm(call_get_ua(s->call))->rtpkeep; + + s->rtpkeep = mem_deref(s->rtpkeep); + + if (rtpkeep && sdp_media_rformat(s->sdp, NULL)) { + int err; + err = rtpkeep_alloc(&s->rtpkeep, rtpkeep, + IPPROTO_UDP, s->rtp, s->sdp); + if (err) { + warning("stream: rtpkeep_alloc failed: %m\n", err); + } + } +} + + +int stream_send(struct stream *s, bool marker, int pt, uint32_t ts, + struct mbuf *mb) +{ + int err = 0; + + if (!s) + return EINVAL; + + if (!sa_isset(sdp_media_raddr(s->sdp), SA_ALL)) + return 0; + if (sdp_media_dir(s->sdp) != SDP_SENDRECV) + return 0; + + metric_add_packet(&s->metric_tx, mbuf_get_left(mb)); + + if (pt < 0) + pt = s->pt_enc; + + if (pt >= 0) { + err = rtp_send(s->rtp, sdp_media_raddr(s->sdp), + marker, pt, ts, mb); + if (err) + s->metric_tx.n_err++; + } + + rtpkeep_refresh(s->rtpkeep, ts); + + return err; +} + + +static void stream_remote_set(struct stream *s) +{ + struct sa rtcp; + + if (!s) + return; + + /* RFC 5761 */ + if (s->cfg.rtcp_mux && sdp_media_rattr(s->sdp, "rtcp-mux")) { + + if (!s->rtcp_mux) + info("%s: RTP/RTCP multiplexing enabled\n", + sdp_media_name(s->sdp)); + s->rtcp_mux = true; + } + + rtcp_enable_mux(s->rtp, s->rtcp_mux); + + sdp_media_raddr_rtcp(s->sdp, &rtcp); + + rtcp_start(s->rtp, s->cname, + s->rtcp_mux ? sdp_media_raddr(s->sdp): &rtcp); +} + + +void stream_update(struct stream *s) +{ + const struct sdp_format *fmt; + int err = 0; + + if (!s) + return; + + fmt = sdp_media_rformat(s->sdp, NULL); + + s->pt_enc = fmt ? fmt->pt : -1; + + if (sdp_media_has_media(s->sdp)) + stream_remote_set(s); + + if (s->menc && s->menc->mediah) { + err = s->menc->mediah(&s->mes, s->mencs, s->rtp, + IPPROTO_UDP, + rtp_sock(s->rtp), + s->rtcp ? rtcp_sock(s->rtp) : NULL, + s->sdp); + if (err) { + warning("stream: mediaenc update: %m\n", err); + } + } +} + + +void stream_update_encoder(struct stream *s, int pt_enc) +{ + if (pt_enc >= 0) + s->pt_enc = pt_enc; +} + + +int stream_jbuf_stat(struct re_printf *pf, const struct stream *s) +{ + struct jbuf_stat stat; + int err; + + if (!s) + return EINVAL; + + err = re_hprintf(pf, " %s:", sdp_media_name(s->sdp)); + + err |= jbuf_stats(s->jbuf, &stat); + if (err) { + err = re_hprintf(pf, "Jbuf stat: (not available)"); + } + else { + err = re_hprintf(pf, "Jbuf stat: put=%u get=%u or=%u ur=%u", + stat.n_put, stat.n_get, + stat.n_overflow, stat.n_underflow); + } + + return err; +} + + +void stream_hold(struct stream *s, bool hold) +{ + if (!s) + return; + + sdp_media_set_ldir(s->sdp, hold ? SDP_SENDONLY : SDP_SENDRECV); +} + + +void stream_set_srate(struct stream *s, uint32_t srate_tx, uint32_t srate_rx) +{ + if (!s) + return; + + rtcp_set_srate(s->rtp, srate_tx, srate_rx); +} + + +void stream_send_fir(struct stream *s, bool pli) +{ + int err; + + if (!s) + return; + + if (pli) + err = rtcp_send_pli(s->rtp, s->ssrc_rx); + else + err = rtcp_send_fir(s->rtp, rtp_sess_ssrc(s->rtp)); + + if (err) { + s->metric_tx.n_err++; + + warning("stream: failed to send RTCP %s: %m\n", + pli ? "PLI" : "FIR", err); + } +} + + +void stream_reset(struct stream *s) +{ + if (!s) + return; + + jbuf_flush(s->jbuf); + + stream_start_keepalive(s); +} + + +void stream_set_bw(struct stream *s, uint32_t bps) +{ + if (!s) + return; + + sdp_media_set_lbandwidth(s->sdp, SDP_BANDWIDTH_AS, bps / 1024); +} + + +int stream_debug(struct re_printf *pf, const struct stream *s) +{ + struct sa rrtcp; + int err; + + if (!s) + return 0; + + err = re_hprintf(pf, " %s dir=%s pt_enc=%d\n", sdp_media_name(s->sdp), + sdp_dir_name(sdp_media_dir(s->sdp)), + s->pt_enc); + + sdp_media_raddr_rtcp(s->sdp, &rrtcp); + err |= re_hprintf(pf, " remote: %J/%J\n", + sdp_media_raddr(s->sdp), &rrtcp); + + err |= rtp_debug(pf, s->rtp); + err |= jbuf_debug(pf, s->jbuf); + + return err; +} + + +int stream_print(struct re_printf *pf, const struct stream *s) +{ + if (!s) + return 0; + + return re_hprintf(pf, " %s=%u/%u", sdp_media_name(s->sdp), + s->metric_tx.cur_bitrate, + s->metric_rx.cur_bitrate); +} |