summaryrefslogtreecommitdiff
path: root/src/modules/rtp/module-rtp-recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/rtp/module-rtp-recv.c')
-rw-r--r--src/modules/rtp/module-rtp-recv.c48
1 files changed, 16 insertions, 32 deletions
diff --git a/src/modules/rtp/module-rtp-recv.c b/src/modules/rtp/module-rtp-recv.c
index 79e712d..a9b42bb 100644
--- a/src/modules/rtp/module-rtp-recv.c
+++ b/src/modules/rtp/module-rtp-recv.c
@@ -90,12 +90,11 @@ struct session {
pa_memblockq *memblockq;
bool first_packet;
- uint32_t ssrc;
uint32_t offset;
struct pa_sdp_info sdp_info;
- pa_rtp_context rtp_context;
+ pa_rtp_context *rtp_context;
pa_rtpoll_item *rtpoll_item;
@@ -205,12 +204,13 @@ static void sink_input_suspend_within_thread(pa_sink_input* i, bool b) {
/* Called from I/O thread context */
static int rtpoll_work_cb(pa_rtpoll_item *i) {
pa_memchunk chunk;
+ uint32_t timestamp;
int64_t k, j, delta;
struct timeval now = { 0, 0 };
struct session *s;
struct pollfd *p;
- pa_assert_se(s = pa_rtpoll_item_get_userdata(i));
+ pa_assert_se(s = pa_rtpoll_item_get_work_userdata(i));
p = pa_rtpoll_item_get_pollfd(i, NULL);
@@ -224,40 +224,30 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
p->revents = 0;
- if (pa_rtp_recv(&s->rtp_context, &chunk, s->userdata->module->core->mempool, &now) < 0)
+ if (pa_rtp_recv(s->rtp_context, &chunk, s->userdata->module->core->mempool, &timestamp, &now) < 0)
return 0;
- if (s->sdp_info.payload != s->rtp_context.payload ||
- !PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
+ if (!PA_SINK_IS_OPENED(s->sink_input->sink->thread_info.state)) {
pa_memblock_unref(chunk.memblock);
return 0;
}
if (!s->first_packet) {
s->first_packet = true;
-
- s->ssrc = s->rtp_context.ssrc;
- s->offset = s->rtp_context.timestamp;
-
- if (s->ssrc == s->userdata->module->core->cookie)
- pa_log_warn("Detected RTP packet loop!");
- } else {
- if (s->ssrc != s->rtp_context.ssrc) {
- pa_memblock_unref(chunk.memblock);
- return 0;
- }
+ s->offset = timestamp;
}
/* Check whether there was a timestamp overflow */
- k = (int64_t) s->rtp_context.timestamp - (int64_t) s->offset;
- j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) s->rtp_context.timestamp;
+ k = (int64_t) timestamp - (int64_t) s->offset;
+ j = (int64_t) 0x100000000LL - (int64_t) s->offset + (int64_t) timestamp;
if ((k < 0 ? -k : k) < (j < 0 ? -j : j))
delta = k;
else
delta = j;
- pa_memblockq_seek(s->memblockq, delta * (int64_t) s->rtp_context.frame_size, PA_SEEK_RELATIVE, true);
+ pa_memblockq_seek(s->memblockq, delta * (int64_t) pa_rtp_context_get_frame_size(s->rtp_context), PA_SEEK_RELATIVE,
+ true);
if (now.tv_sec == 0) {
PA_ONCE_BEGIN {
@@ -277,7 +267,7 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
pa_memblock_unref(chunk.memblock);
/* The next timestamp we expect */
- s->offset = s->rtp_context.timestamp + (uint32_t) (chunk.length / s->rtp_context.frame_size);
+ s->offset = timestamp + (uint32_t) (chunk.length / pa_rtp_context_get_frame_size(s->rtp_context));
pa_atomic_store(&s->timestamp, (int) now.tv_sec);
@@ -386,21 +376,14 @@ static int rtpoll_work_cb(pa_rtpoll_item *i) {
/* Called from I/O thread context */
static void sink_input_attach(pa_sink_input *i) {
struct session *s;
- struct pollfd *p;
pa_sink_input_assert_ref(i);
pa_assert_se(s = i->userdata);
pa_assert(!s->rtpoll_item);
- s->rtpoll_item = pa_rtpoll_item_new(i->sink->thread_info.rtpoll, PA_RTPOLL_LATE, 1);
-
- p = pa_rtpoll_item_get_pollfd(s->rtpoll_item, NULL);
- p->fd = s->rtp_context.fd;
- p->events = POLLIN;
- p->revents = 0;
+ s->rtpoll_item = pa_rtp_context_get_rtpoll_item(s->rtp_context, i->sink->thread_info.rtpoll);
- pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb);
- pa_rtpoll_item_set_userdata(s->rtpoll_item, s);
+ pa_rtpoll_item_set_work_callback(s->rtpoll_item, rtpoll_work_cb, s);
}
/* Called from I/O thread context */
@@ -585,7 +568,8 @@ static struct session *session_new(struct userdata *u, const pa_sdp_info *sdp_in
pa_memblock_unref(silence.memblock);
- pa_rtp_context_init_recv(&s->rtp_context, fd, pa_frame_size(&s->sdp_info.sample_spec));
+ if (!(s->rtp_context = pa_rtp_context_new_recv(fd, sdp_info->payload, &s->sdp_info.sample_spec)))
+ goto fail;
pa_hashmap_put(s->userdata->by_origin, s->sdp_info.origin, s);
u->n_sessions++;
@@ -620,7 +604,7 @@ static void session_free(struct session *s) {
pa_memblockq_free(s->memblockq);
pa_sdp_info_destroy(&s->sdp_info);
- pa_rtp_context_destroy(&s->rtp_context);
+ pa_rtp_context_free(s->rtp_context);
pa_xfree(s);
}