diff options
Diffstat (limited to 'demux/demux.c')
-rw-r--r-- | demux/demux.c | 1138 |
1 files changed, 663 insertions, 475 deletions
diff --git a/demux/demux.c b/demux/demux.c index 51a5f4d..56a147e 100644 --- a/demux/demux.c +++ b/demux/demux.c @@ -1,6 +1,4 @@ /* - * DEMUXER v2.5 - * * This file is part of MPlayer. * * MPlayer is free software; you can redistribute it and/or modify @@ -17,13 +15,13 @@ * with MPlayer; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define DEMUX_PRIV(x) x #include <stdio.h> #include <stdlib.h> #include <string.h> #include <assert.h> #include <unistd.h> +#include <pthread.h> #include <math.h> @@ -32,7 +30,6 @@ #include "config.h" #include "options/options.h" -#include "common/av_common.h" #include "talloc.h" #include "common/msg.h" #include "common/global.h" @@ -44,8 +41,6 @@ #include "audio/format.h" -#include <libavcodec/avcodec.h> - // Demuxer list extern const struct demuxer_desc demuxer_desc_edl; extern const struct demuxer_desc demuxer_desc_cue; @@ -58,12 +53,14 @@ extern const demuxer_desc_t demuxer_desc_lavf; extern const demuxer_desc_t demuxer_desc_libass; extern const demuxer_desc_t demuxer_desc_subreader; extern const demuxer_desc_t demuxer_desc_playlist; +extern const demuxer_desc_t demuxer_desc_disc; /* Please do not add any new demuxers here. If you want to implement a new * demuxer, add it to libavformat, except for wrappers around external * libraries and demuxers requiring binary support. */ const demuxer_desc_t *const demuxer_list[] = { + &demuxer_desc_disc, &demuxer_desc_edl, &demuxer_desc_cue, &demuxer_desc_rawaudio, @@ -80,25 +77,76 @@ const demuxer_desc_t *const demuxer_list[] = { &demuxer_desc_playlist, // Pretty aggressive, so should be last. &demuxer_desc_subreader, - /* Please do not add any new demuxers here. If you want to implement a new - * demuxer, add it to libavformat, except for wrappers around external - * libraries and demuxers requiring binary support. */ NULL }; +struct demux_internal { + struct mp_log *log; + + // The demuxer runs potentially in another thread, so we keep two demuxer + // structs; the real demuxer can access the shadow struct only. + // Since demuxer and user threads both don't use locks, a third demuxer + // struct d_buffer is used to copy data between them in a synchronized way. + struct demuxer *d_thread; // accessed by demuxer impl. (producer) + struct demuxer *d_user; // accessed by player (consumer) + struct demuxer *d_buffer; // protected by lock; used to sync d_user/thread + + // The lock protects the packet queues (struct demux_stream), d_buffer, + // and some minor fields like thread_paused. + pthread_mutex_t lock; + pthread_cond_t wakeup; + pthread_t thread; + + // -- All the following fields are protected by lock. + + bool thread_paused; + int thread_request_pause; // counter, if >0, make demuxer thread pause + bool thread_terminate; + bool threading; + void (*wakeup_cb)(void *ctx); + void *wakeup_cb_ctx; + + bool warned_queue_overflow; + bool last_eof; // last actual global EOF status + bool eof; // whether we're in EOF state (reset for retry) + bool autoselect; + int min_packs; + int min_bytes; + + bool tracks_switched; // thread needs to inform demuxer of this + + bool seeking; // there's a seek queued + int seek_flags; // flags for next seek (if seeking==true) + double seek_pts; + + // Cached state. + double time_length; + struct mp_tags *stream_metadata; + int64_t stream_size; + int64_t stream_cache_size; + int64_t stream_cache_fill; + int stream_cache_idle; +}; + struct demux_stream { - int selected; // user wants packets from this stream - int eof; // end of demuxed stream? (true if all buffer empty) - int packs; // number of packets in buffer - int bytes; // total bytes of packets in buffer + struct demux_internal *in; + enum stream_type type; + // all fields are protected by in->lock + bool selected; // user wants packets from this stream + bool active; // try to keep at least 1 packet queued + bool eof; // end of demuxed stream? (true if all buffer empty) + size_t packs; // number of packets in buffer + size_t bytes; // total bytes of packets in buffer struct demux_packet *head; struct demux_packet *tail; }; -static void add_stream_chapters(struct demuxer *demuxer); -void demuxer_sort_chapters(demuxer_t *demuxer); +static void demuxer_sort_chapters(demuxer_t *demuxer); +static void *demux_thread(void *pctx); +static void update_cache(struct demux_internal *in); -static void ds_free_packs(struct demux_stream *ds) +// called locked +static void ds_flush(struct demux_stream *ds) { demux_packet_t *dp = ds->head; while (dp) { @@ -107,110 +155,16 @@ static void ds_free_packs(struct demux_stream *ds) dp = dn; } ds->head = ds->tail = NULL; - ds->packs = 0; // !!!!! + ds->packs = 0; ds->bytes = 0; - ds->eof = 0; -} - -static void packet_destroy(void *ptr) -{ - struct demux_packet *dp = ptr; - talloc_free(dp->avpacket); - av_free(dp->allocation); -} - -static struct demux_packet *create_packet(size_t len) -{ - if (len > 1000000000) { - fprintf(stderr, "Attempt to allocate demux packet over 1 GB!\n"); - abort(); - } - struct demux_packet *dp = talloc(NULL, struct demux_packet); - talloc_set_destructor(dp, packet_destroy); - *dp = (struct demux_packet) { - .len = len, - .pts = MP_NOPTS_VALUE, - .dts = MP_NOPTS_VALUE, - .duration = -1, - .stream_pts = MP_NOPTS_VALUE, - .pos = -1, - .stream = -1, - }; - return dp; -} - -struct demux_packet *new_demux_packet(size_t len) -{ - struct demux_packet *dp = create_packet(len); - dp->buffer = av_malloc(len + FF_INPUT_BUFFER_PADDING_SIZE); - if (!dp->buffer) { - fprintf(stderr, "Memory allocation failure!\n"); - abort(); - } - memset(dp->buffer + len, 0, FF_INPUT_BUFFER_PADDING_SIZE); - dp->allocation = dp->buffer; - return dp; -} - -// data must already have suitable padding, and does not copy the data -struct demux_packet *new_demux_packet_fromdata(void *data, size_t len) -{ - struct demux_packet *dp = create_packet(len); - dp->buffer = data; - return dp; -} - -struct demux_packet *new_demux_packet_from(void *data, size_t len) -{ - struct demux_packet *dp = new_demux_packet(len); - memcpy(dp->buffer, data, len); - return dp; -} - -void demux_packet_shorten(struct demux_packet *dp, size_t len) -{ - assert(len <= dp->len); - dp->len = len; - memset(dp->buffer + dp->len, 0, FF_INPUT_BUFFER_PADDING_SIZE); -} - -void free_demux_packet(struct demux_packet *dp) -{ - talloc_free(dp); -} - -static void destroy_avpacket(void *pkt) -{ - av_free_packet(pkt); -} - -struct demux_packet *demux_copy_packet(struct demux_packet *dp) -{ - struct demux_packet *new = NULL; - if (dp->avpacket) { - assert(dp->buffer == dp->avpacket->data); - assert(dp->len == dp->avpacket->size); - AVPacket *newavp = talloc_zero(NULL, AVPacket); - talloc_set_destructor(newavp, destroy_avpacket); - av_init_packet(newavp); - if (av_packet_ref(newavp, dp->avpacket) < 0) - abort(); - new = new_demux_packet_fromdata(newavp->data, newavp->size); - new->avpacket = newavp; - } - if (!new) { - new = new_demux_packet(dp->len); - memcpy(new->buffer, dp->buffer, new->len); - } - new->pts = dp->pts; - new->dts = dp->dts; - new->duration = dp->duration; - new->stream_pts = dp->stream_pts; - return new; + ds->eof = false; + ds->active = false; } struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) { + assert(demuxer == demuxer->in->d_thread); + if (demuxer->num_streams > MAX_SH_STREAMS) { MP_WARN(demuxer, "Too many streams.\n"); return NULL; @@ -225,33 +179,22 @@ struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type) struct sh_stream *sh = talloc_ptrtype(demuxer, sh); *sh = (struct sh_stream) { .type = type, - .demuxer = demuxer, .index = demuxer->num_streams, .demuxer_id = demuxer_id, // may be overwritten by demuxer - .ds = talloc_zero(sh, struct demux_stream), + .ds = talloc(sh, struct demux_stream), + }; + *sh->ds = (struct demux_stream) { + .in = demuxer->in, + .type = sh->type, + .selected = demuxer->in->autoselect, }; MP_TARRAY_APPEND(demuxer, demuxer->streams, demuxer->num_streams, sh); switch (sh->type) { - case STREAM_VIDEO: { - struct sh_video *sht = talloc_zero(demuxer, struct sh_video); - sh->video = sht; - break; - } - case STREAM_AUDIO: { - struct sh_audio *sht = talloc_zero(demuxer, struct sh_audio); - sh->audio = sht; - break; - } - case STREAM_SUB: { - struct sh_sub *sht = talloc_zero(demuxer, struct sh_sub); - sh->sub = sht; - break; - } - default: assert(false); + case STREAM_VIDEO: sh->video = talloc_zero(demuxer, struct sh_video); break; + case STREAM_AUDIO: sh->audio = talloc_zero(demuxer, struct sh_audio); break; + case STREAM_SUB: sh->sub = talloc_zero(demuxer, struct sh_sub); break; } - sh->ds->selected = demuxer->stream_autoselect; - return sh; } @@ -259,56 +202,83 @@ void free_demuxer(demuxer_t *demuxer) { if (!demuxer) return; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + demux_stop_thread(demuxer); + if (demuxer->desc->close) - demuxer->desc->close(demuxer); - // free streams: + demuxer->desc->close(in->d_thread); for (int n = 0; n < demuxer->num_streams; n++) - ds_free_packs(demuxer->streams[n]->ds); + ds_flush(demuxer->streams[n]->ds); + pthread_mutex_destroy(&in->lock); + pthread_cond_destroy(&in->wakeup); talloc_free(demuxer); } -const char *stream_type_name(enum stream_type type) +// Start the demuxer thread, which reads ahead packets on its own. +void demux_start_thread(struct demuxer *demuxer) { - switch (type) { - case STREAM_VIDEO: return "video"; - case STREAM_AUDIO: return "audio"; - case STREAM_SUB: return "sub"; - default: return "unknown"; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (!in->threading) { + in->threading = true; + if (pthread_create(&in->thread, NULL, demux_thread, in)) + in->threading = false; } } -static int count_packs(struct demuxer *demux, enum stream_type type) +void demux_stop_thread(struct demuxer *demuxer) { - int c = 0; - for (int n = 0; n < demux->num_streams; n++) - c += demux->streams[n]->type == type ? demux->streams[n]->ds->packs : 0; - return c; + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + + if (in->threading) { + pthread_mutex_lock(&in->lock); + in->thread_terminate = true; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); + pthread_join(in->thread, NULL); + in->threading = false; + in->thread_terminate = false; + } } -static int count_bytes(struct demuxer *demux, enum stream_type type) +// The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached. +void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx) +{ + struct demux_internal *in = demuxer->in; + pthread_mutex_lock(&in->lock); + in->wakeup_cb = cb; + in->wakeup_cb_ctx = ctx; + pthread_mutex_unlock(&in->lock); +} + +const char *stream_type_name(enum stream_type type) { - int c = 0; - for (int n = 0; n < demux->num_streams; n++) - c += demux->streams[n]->type == type ? demux->streams[n]->ds->bytes : 0; - return c; + switch (type) { + case STREAM_VIDEO: return "video"; + case STREAM_AUDIO: return "audio"; + case STREAM_SUB: return "sub"; + default: return "unknown"; + } } // Returns the same value as demuxer->fill_buffer: 1 ok, 0 EOF/not selected. -int demuxer_add_packet(demuxer_t *demuxer, struct sh_stream *stream, - demux_packet_t *dp) +int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp) { struct demux_stream *ds = stream ? stream->ds : NULL; - if (!dp || !ds || !ds->selected) { + if (!dp || !ds) { talloc_free(dp); return 0; } - - if (stream->type == STREAM_VIDEO && !dp->len) { - /* Video packets with size 0 are assumed to not correspond to frames, - * but to indicate the absence of a frame in formats like AVI - * that must have packets at fixed timestamp intervals. */ + struct demux_internal *in = ds->in; + pthread_mutex_lock(&in->lock); + if (!ds->selected || in->seeking) { + pthread_mutex_unlock(&in->lock); talloc_free(dp); - return 1; + return 0; } dp->stream = stream->index; @@ -324,82 +294,192 @@ int demuxer_add_packet(demuxer_t *demuxer, struct sh_stream *stream, // first packet in stream ds->head = ds->tail = dp; } - /* ds_get_packets() can set ds->eof to 1 when another stream runs out of - * buffer space. That makes sense because in that situation the calling - * code should not count on being able to demux more packets from this - * stream. (Can happen with e.g. badly interleaved files.) - * In this case, we didn't necessarily reach EOF, and new packet can - * appear. */ - ds->eof = 0; - - if (dp->pos >= 0) - demuxer->filepos = dp->pos; + // obviously not true anymore + ds->eof = false; + in->last_eof = in->eof = false; // For video, PTS determination is not trivial, but for other media types // distinguishing PTS and DTS is not useful. if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE) dp->pts = dp->dts; - MP_DBG(demuxer, "DEMUX: Append packet to %s, len=%d pts=%5.3f pos=%"PRIi64" " - "[packs: A=%d V=%d S=%d]\n", stream_type_name(stream->type), - dp->len, dp->pts, dp->pos, count_packs(demuxer, STREAM_AUDIO), - count_packs(demuxer, STREAM_VIDEO), count_packs(demuxer, STREAM_SUB)); + MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" " + "[num=%zd size=%zd]\n", stream_type_name(stream->type), + dp->len, dp->pts, dp->dts, dp->pos, ds->packs, ds->bytes); + + if (ds->in->wakeup_cb) + ds->in->wakeup_cb(ds->in->wakeup_cb_ctx); + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); return 1; } -static bool demux_check_queue_full(demuxer_t *demux) +// Returns true if there was "progress" (lock was released temporarily). +static bool read_packet(struct demux_internal *in) { - for (int n = 0; n < demux->num_streams; n++) { - struct sh_stream *sh = demux->streams[n]; - if (sh->ds->packs > MAX_PACKS || sh->ds->bytes > MAX_PACK_BYTES) - goto overflow; + in->eof = false; + + // Check if we need to read a new packet. We do this if all queues are below + // the minimum, or if a stream explicitly needs new packets. Also includes + // safe-guards against packet queue overflow. + bool active = false, read_more = false; + size_t packs = 0, bytes = 0; + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + active |= ds->selected && ds->active; + read_more |= ds->active && !ds->head; + packs += ds->packs; + bytes += ds->bytes; + } + MP_DBG(in, "packets=%zd, bytes=%zd, active=%d, more=%d\n", + packs, bytes, active, read_more); + if (packs >= MAX_PACKS || bytes >= MAX_PACK_BYTES) { + if (!in->warned_queue_overflow) { + in->warned_queue_overflow = true; + MP_ERR(in, "Too many packets in the demuxer packet queues:\n"); + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + if (ds->selected) { + MP_ERR(in, " %s/%d: %zd packets, %zd bytes\n", + stream_type_name(ds->type), n, ds->packs, ds->bytes); + } + } + } + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + ds->eof |= !ds->head; + } + pthread_cond_signal(&in->wakeup); + return false; } - return false; - -overflow: - - if (!demux->warned_queue_overflow) { - MP_ERR(demux, "Too many packets in the demuxer " - "packet queue (video: %d packets in %d bytes, audio: %d " - "packets in %d bytes, sub: %d packets in %d bytes).\n", - count_packs(demux, STREAM_VIDEO), count_bytes(demux, STREAM_VIDEO), - count_packs(demux, STREAM_AUDIO), count_bytes(demux, STREAM_AUDIO), - count_packs(demux, STREAM_SUB), count_bytes(demux, STREAM_SUB)); - MP_INFO(demux, "Maybe you are playing a non-" - "interleaved stream/file or the codec failed?\n"); + if (packs < in->min_packs && bytes < in->min_bytes) + read_more |= active; + + if (!read_more) + return false; + + // Actually read a packet. Drop the lock while doing so, because waiting + // for disk or network I/O can take time. + pthread_mutex_unlock(&in->lock); + struct demuxer *demux = in->d_thread; + bool eof = !demux->desc->fill_buffer || demux->desc->fill_buffer(demux) <= 0; + pthread_mutex_lock(&in->lock); + + update_cache(in); + + if (eof) { + for (int n = 0; n < in->d_buffer->num_streams; n++) { + struct demux_stream *ds = in->d_buffer->streams[n]->ds; + ds->eof = true; + ds->active = false; + } + // If we had EOF previously, then don't wakeup (avoids wakeup loop) + if (!in->last_eof) { + if (in->wakeup_cb) + in->wakeup_cb(in->wakeup_cb_ctx); + pthread_cond_signal(&in->wakeup); + MP_VERBOSE(in, "EOF reached.\n"); + } } - demux->warned_queue_overflow = true; + in->eof = in->last_eof = eof; return true; } -// return value: -// 0 = EOF or no stream found or invalid type -// 1 = successfully read a packet +// must be called locked; may temporarily unlock +static void ds_get_packets(struct demux_stream *ds) +{ + const char *t = stream_type_name(ds->type); + struct demux_internal *in = ds->in; + MP_DBG(in, "reading packet for %s\n", t); + in->eof = false; // force retry + ds->eof = false; + while (ds->selected && !ds->head && !ds->eof) { + ds->active = true; + // Note: the following code marks EOF if it can't continue + if (in->threading) { + MP_VERBOSE(in, "waiting for demux thread (%s)\n", t); + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); + } else { + read_packet(in); + } + } +} -static int demux_fill_buffer(demuxer_t *demux) +static void execute_trackswitch(struct demux_internal *in) { - return demux->desc->fill_buffer ? demux->desc->fill_buffer(demux) : 0; + in->tracks_switched = false; + + pthread_mutex_unlock(&in->lock); + + if (in->d_thread->desc->control) + in->d_thread->desc->control(in->d_thread, DEMUXER_CTRL_SWITCHED_TRACKS, 0); + + pthread_mutex_lock(&in->lock); } -static void ds_get_packets(struct sh_stream *sh) +static void execute_seek(struct demux_internal *in) { - struct demux_stream *ds = sh->ds; - demuxer_t *demux = sh->demuxer; - MP_TRACE(demux, "ds_get_packets (%s) called\n", - stream_type_name(sh->type)); - while (1) { - if (ds->head) - return; - - if (demux_check_queue_full(demux)) - break; + int flags = in->seek_flags; + double pts = in->seek_pts; + in->seeking = false; + + pthread_mutex_unlock(&in->lock); + + if (in->d_thread->desc->seek) + in->d_thread->desc->seek(in->d_thread, pts, flags); - if (!demux_fill_buffer(demux)) - break; // EOF + pthread_mutex_lock(&in->lock); +} + +static void *demux_thread(void *pctx) +{ + struct demux_internal *in = pctx; + pthread_mutex_lock(&in->lock); + while (!in->thread_terminate) { + in->thread_paused = in->thread_request_pause > 0; + if (in->thread_paused) { + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); + continue; + } + if (in->tracks_switched) { + execute_trackswitch(in); + continue; + } + if (in->seeking) { + execute_seek(in); + continue; + } + if (!in->eof) { + if (read_packet(in)) + continue; // read_packet unlocked, so recheck conditions + } + update_cache(in); + pthread_cond_signal(&in->wakeup); + pthread_cond_wait(&in->wakeup, &in->lock); } - MP_VERBOSE(demux, "ds_get_packets: EOF reached (stream: %s)\n", - stream_type_name(sh->type)); - ds->eof = 1; + pthread_mutex_unlock(&in->lock); + return NULL; +} + +static struct demux_packet *dequeue_packet(struct demux_stream *ds) +{ + if (!ds->head) + return NULL; + struct demux_packet *pkt = ds->head; + ds->head = pkt->next; + pkt->next = NULL; + if (!ds->head) + ds->tail = NULL; + ds->bytes -= pkt->len; + ds->packs--; + + // This implies this function is actually called from "the" user thread. + if (pkt && pkt->pos >= 0) + ds->in->d_user->filepos = pkt->pos; + + return pkt; } // Read a packet from the given stream. The returned packet belongs to the @@ -408,24 +488,46 @@ static void ds_get_packets(struct sh_stream *sh) struct demux_packet *demux_read_packet(struct sh_stream *sh) { struct demux_stream *ds = sh ? sh->ds : NULL; + struct demux_packet *pkt = NULL; + if (ds) { + pthread_mutex_lock(&ds->in->lock); + ds_get_packets(ds); + pkt = dequeue_packet(ds); + pthread_cond_signal(&ds->in->wakeup); // possibly read more + pthread_mutex_unlock(&ds->in->lock); + } + return pkt; +} + +// Poll the demuxer queue, and if there's a packet, return it. Otherwise, just +// make the demuxer thread read packets for this stream, and if there's at +// least one packet, call the wakeup callback. +// Unlike demux_read_packet(), this always enables readahead (which means you +// must not use it on interleaved subtitle streams). +// Returns: +// < 0: EOF was reached, *out_pkt=NULL +// == 0: no new packet yet, but maybe later, *out_pkt=NULL +// > 0: new packet read, *out_pkt is set +int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt) +{ + struct demux_stream *ds = sh ? sh->ds : NULL; + int r = -1; + *out_pkt = NULL; if (ds) { - ds_get_packets(sh); - struct demux_packet *pkt = ds->head; - if (pkt) { - ds->head = pkt->next; - pkt->next = NULL; - if (!ds->head) - ds->tail = NULL; - ds->bytes -= pkt->len; - ds->packs--; - - if (pkt->stream_pts != MP_NOPTS_VALUE) - sh->demuxer->stream_pts = pkt->stream_pts; - - return pkt; + if (ds->in->threading) { + pthread_mutex_lock(&ds->in->lock); + *out_pkt = dequeue_packet(ds); + r = *out_pkt ? 1 : (ds->eof ? -1 : 0); + ds->active = ds->selected; // enable readahead + ds->in->eof = false; // force retry + pthread_cond_signal(&ds->in->wakeup); // possibly read more + pthread_mutex_unlock(&ds->in->lock); + } else { + *out_pkt = demux_read_packet(sh); + r = *out_pkt ? 1 : -1; } } - return NULL; + return r; } // Return the pts of the next packet that demux_read_packet() would return. @@ -433,31 +535,61 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh) // packets from the queue. double demux_get_next_pts(struct sh_stream *sh) { - if (sh && sh->ds->selected) { - ds_get_packets(sh); + double res = MP_NOPTS_VALUE; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + ds_get_packets(sh->ds); if (sh->ds->head) - return sh->ds->head->pts; + res = sh->ds->head->pts; + pthread_mutex_unlock(&sh->ds->in->lock); } - return MP_NOPTS_VALUE; + return res; } // Return whether a packet is queued. Never blocks, never forces any reads. bool demux_has_packet(struct sh_stream *sh) { - return sh && sh->ds->head; + bool has_packet = false; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + has_packet = sh->ds->head; + pthread_mutex_unlock(&sh->ds->in->lock); + } + return has_packet; } -// Same as demux_has_packet, but to be called internally by demuxers, as -// opposed to the user of the demuxer. -bool demuxer_stream_has_packets_queued(struct demuxer *d, struct sh_stream *stream) +// Return whether EOF was returned with an earlier packet read. +bool demux_stream_eof(struct sh_stream *sh) { - return demux_has_packet(stream); + bool eof = false; + if (sh) { + pthread_mutex_lock(&sh->ds->in->lock); + eof = sh->ds->eof && !sh->ds->head; + pthread_mutex_unlock(&sh->ds->in->lock); + } + return eof; } -// Return whether EOF was returned with an earlier packet read. -bool demux_stream_eof(struct sh_stream *sh) +// Read and return any packet we find. +struct demux_packet *demux_read_any_packet(struct demuxer *demuxer) { - return !sh || sh->ds->eof; + assert(!demuxer->in->threading); // doesn't work with threading + bool read_more = true; + while (read_more) { + for (int n = 0; n < demuxer->num_streams; n++) { + struct sh_stream *sh = demuxer->streams[n]; + sh->ds->active = sh->ds->selected; // force read_packet() to read + struct demux_packet *pkt = dequeue_packet(sh->ds); + if (pkt) + return pkt; + } + // retry after calling this + pthread_mutex_lock(&demuxer->in->lock); + read_more = read_packet(demuxer->in); + read_more &= !demuxer->in->eof; + pthread_mutex_unlock(&demuxer->in->lock); + } + return NULL; } // ==================================================================== @@ -543,8 +675,7 @@ static void demux_export_replaygain(demuxer_t *demuxer) { float tg, tp, ag, ap; - if (!demuxer->replaygain_data && - !decode_gain(demuxer, "REPLAYGAIN_TRACK_GAIN", &tg) && + if (!decode_gain(demuxer, "REPLAYGAIN_TRACK_GAIN", &tg) && !decode_peak(demuxer, "REPLAYGAIN_TRACK_PEAK", &tp) && !decode_gain(demuxer, "REPLAYGAIN_ALBUM_GAIN", &ag) && !decode_peak(demuxer, "REPLAYGAIN_ALBUM_PEAK", &ap)) @@ -556,8 +687,88 @@ static void demux_export_replaygain(demuxer_t *demuxer) rgain->album_gain = ag; rgain->album_peak = ap; - demuxer->replaygain_data = rgain; + for (int n = 0; n < demuxer->num_streams; n++) { + struct sh_stream *sh = demuxer->streams[n]; + if (sh->audio && !sh->audio->replaygain_data) + sh->audio->replaygain_data = rgain; + } + } +} + +// Copy all fields from src to dst, depending on event flags. +static void demux_copy(struct demuxer *dst, struct demuxer *src) +{ + if (src->events & DEMUX_EVENT_INIT) { + // Note that we do as shallow copies as possible. We expect the date + // that is not-copied (only referenced) to be immutable. + // This implies e.g. that no chapters are added after initialization. + dst->chapters = src->chapters; + dst->num_chapters = src->num_chapters; + dst->editions = src->editions; + dst->num_editions = src->num_editions; + dst->edition = src->edition; + dst->attachments = src->attachments; + dst->num_attachments = src->num_attachments; + dst->matroska_data = src->matroska_data; + dst->file_contents = src->file_contents; + dst->playlist = src->playlist; + dst->seekable = src->seekable; + dst->filetype = src->filetype; + dst->ts_resets_possible = src->ts_resets_possible; + dst->start_time = src->start_time; } + if (src->events & DEMUX_EVENT_STREAMS) { + // The stream structs themselves are immutable. + for (int n = dst->num_streams; n < src->num_streams; n++) + MP_TARRAY_APPEND(dst, dst->streams, dst->num_streams, src->streams[n]); + } + if (src->events & DEMUX_EVENT_METADATA) { + talloc_free(dst->metadata); + dst->metadata = mp_tags_dup(dst, src->metadata); + } + dst->events |= src->events; + src->events = 0; +} + +// This is called by demuxer implementations if certain parameters change +// at runtime. +// events is one of DEMUX_EVENT_* +// The code will copy the fields references by the events to the user-thread. +void demux_changed(demuxer_t *demuxer, int events) +{ + assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only + struct demux_internal *in = demuxer->in; + + demuxer->events |= events; + + pthread_mutex_lock(&in->lock); + + update_cache(in); + + if (demuxer->events & DEMUX_EVENT_INIT) + demuxer_sort_chapters(demuxer); + if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS)) + demux_export_replaygain(demuxer); + + demux_copy(in->d_buffer, demuxer); + + pthread_mutex_unlock(&in->lock); +} + +// Called by the user thread (i.e. player) to update metadata and other things +// from the demuxer thread. +void demux_update(demuxer_t *demuxer) +{ + assert(demuxer == demuxer->in->d_user); + struct demux_internal *in = demuxer->in; + + pthread_mutex_lock(&in->lock); + if (!in->threading) + update_cache(in); + demux_copy(demuxer, in->d_buffer); + if (in->stream_metadata && (demuxer->events & DEMUX_EVENT_METADATA)) + mp_tags_merge(demuxer->metadata, in->stream_metadata); + pthread_mutex_unlock(&in->lock); } static struct demuxer *open_given_type(struct mpv_global *global, @@ -572,54 +783,62 @@ static struct demuxer *open_given_type(struct mpv_global *global, .desc = desc, .type = desc->type, .stream = stream, - .stream_pts = MP_NOPTS_VALUE, .seekable = stream->seekable, - .accurate_seek = true, .filepos = -1, .opts = global->opts, .global = global, .log = mp_log_new(demuxer, log, desc->name), .glog = log, .filename = talloc_strdup(demuxer, stream->url), - .metadata = talloc_zero(demuxer, struct mp_tags), + .events = DEMUX_EVENT_ALL, }; demuxer->seekable = stream->seekable; if (demuxer->stream->uncached_stream && !demuxer->stream->uncached_stream->seekable) demuxer->seekable = false; - demuxer->params = params; // temporary during open() + struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in); + *in = (struct demux_internal){ + .log = demuxer->log, + .d_thread = talloc(demuxer, struct demuxer), + .d_buffer = talloc(demuxer, struct demuxer), + .d_user = demuxer, + .min_packs = demuxer->opts->demuxer_min_packs, + .min_bytes = demuxer->opts->demuxer_min_bytes, + }; + pthread_mutex_init(&in->lock, NULL); + pthread_cond_init(&in->wakeup, NULL); + + *in->d_thread = *demuxer; + *in->d_buffer = *demuxer; + + in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags); + in->d_user->metadata = talloc_zero(in->d_user, struct mp_tags); + in->d_buffer->metadata = talloc_zero(in->d_buffer, struct mp_tags); + int64_t start_pos = stream_tell(stream); mp_verbose(log, "Trying demuxer: %s (force-level: %s)\n", desc->name, d_level(check)); - int ret = demuxer->desc->open(demuxer, check); + in->d_thread->params = params; // temporary during open() + + int ret = demuxer->desc->open(in->d_thread, check); if (ret >= 0) { - demuxer->params = NULL; - if (demuxer->filetype) + in->d_thread->params = NULL; + if (in->d_thread->filetype) mp_verbose(log, "Detected file format: %s (%s)\n", - demuxer->filetype, desc->desc); + in->d_thread->filetype, desc->desc); else mp_verbose(log, "Detected file format: %s\n", desc->desc); - if (stream_manages_timeline(demuxer->stream)) { - // Incorrect, but fixes some behavior with DVD/BD - demuxer->ts_resets_possible = false; - // Doesn't work, because stream_pts is a "guess". - demuxer->accurate_seek = false; - // Can be seekable even if the stream isn't. - demuxer->seekable = true; - } - add_stream_chapters(demuxer); - demuxer_sort_chapters(demuxer); - demux_info_update(demuxer); - demux_export_replaygain(demuxer); // Pretend we can seek if we can't seek, but there's a cache. - if (!demuxer->seekable && stream->uncached_stream) { + if (!in->d_thread->seekable && stream->uncached_stream) { mp_warn(log, "File is not seekable, but there's a cache: enabling seeking.\n"); - demuxer->seekable = true; + in->d_thread->seekable = true; } + demux_changed(in->d_thread, DEMUX_EVENT_ALL); + demux_update(demuxer); return demuxer; } @@ -685,15 +904,28 @@ done: return demuxer; } -void demux_flush(demuxer_t *demuxer) +static void flush_locked(demuxer_t *demuxer) { for (int n = 0; n < demuxer->num_streams; n++) - ds_free_packs(demuxer->streams[n]->ds); - demuxer->warned_queue_overflow = false; + ds_flush(demuxer->streams[n]->ds); + demuxer->in->warned_queue_overflow = false; + demuxer->in->eof = false; + demuxer->in->last_eof = false; +} + +// clear the packet queues +void demux_flush(demuxer_t *demuxer) +{ + pthread_mutex_lock(&demuxer->in->lock); + flush_locked(demuxer); + pthread_mutex_unlock(&demuxer->in->lock); } int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags) { + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); + if (!demuxer->seekable) { MP_WARN(demuxer, "Cannot seek in this file.\n"); return 0; @@ -702,107 +934,27 @@ int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags) if (rel_seek_secs == MP_NOPTS_VALUE && (flags & SEEK_ABSOLUTE)) return 0; - // clear demux buffers: - demux_flush(demuxer); + pthread_mutex_lock(&in->lock); - /* Note: this is for DVD and BD playback. The stream layer has to do these - * seeks, and the demuxer has to react to DEMUXER_CTRL_RESYNC in order to - * deal with the suddenly changing stream position. - */ - struct stream *stream = demuxer->stream; - if (stream_manages_timeline(stream)) { - double pts; - - if (flags & SEEK_ABSOLUTE) - pts = 0.0f; - else { - if (demuxer->stream_pts == MP_NOPTS_VALUE) - goto dmx_seek; - pts = demuxer->stream_pts; - } + flush_locked(demuxer); + in->seeking = true; + in->seek_flags = flags; + in->seek_pts = rel_seek_secs; - if (flags & SEEK_FACTOR) { - double tmp = 0; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_TIME_LENGTH, - &tmp) == STREAM_UNSUPPORTED) - goto dmx_seek; - pts += tmp * rel_seek_secs; - } else - pts += rel_seek_secs; - - if (stream_control(demuxer->stream, STREAM_CTRL_SEEK_TO_TIME, &pts) - != STREAM_UNSUPPORTED) { - demux_control(demuxer, DEMUXER_CTRL_RESYNC, NULL); - return 1; - } - } + if (!in->threading) + execute_seek(in); - dmx_seek: - if (demuxer->desc->seek) - demuxer->desc->seek(demuxer, rel_seek_secs, flags); + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); return 1; } -static int demux_info_print(demuxer_t *demuxer) -{ - struct mp_tags *info = demuxer->metadata; - int n; - - if (!info || !info->num_keys) - return 0; - - mp_info(demuxer->glog, "File tags:\n"); - for (n = 0; n < info->num_keys; n++) { - mp_info(demuxer->glog, " %s: %s\n", info->keys[n], info->values[n]); - } - - return 0; -} - char *demux_info_get(demuxer_t *demuxer, const char *opt) { return mp_tags_get_str(demuxer->metadata, opt); } -bool demux_info_update(struct demuxer *demuxer) -{ - struct mp_tags *tags = demuxer->metadata; - // Take care of stream metadata as well - char **meta; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_METADATA, &meta) > 0) { - for (int n = 0; meta[n + 0]; n += 2) - mp_tags_set_str(tags, meta[n + 0], meta[n + 1]); - talloc_free(meta); - } - // Check for metadata changes the hard way. - char *data = talloc_strdup(demuxer, ""); - for (int n = 0; n < tags->num_keys; n++) { - data = talloc_asprintf_append_buffer(data, "%s=%s\n", tags->keys[n], - tags->values[n]); - } - if (!demuxer->previous_metadata || - strcmp(demuxer->previous_metadata, data) != 0) - { - talloc_free(demuxer->previous_metadata); - demuxer->previous_metadata = data; - demux_info_print(demuxer); - return true; - } else { - talloc_free(data); - return false; - } -} - -int demux_control(demuxer_t *demuxer, int cmd, void *arg) -{ - - if (demuxer->desc->control) - return demuxer->desc->control(demuxer, cmd, arg); - - return DEMUXER_CTRL_NOTIMPL; -} - struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d, enum stream_type t, int id) { @@ -830,21 +982,34 @@ void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream, bool selected) { // don't flush buffers if stream is already selected / unselected + pthread_mutex_lock(&demuxer->in->lock); + bool update = false; if (stream->ds->selected != selected) { stream->ds->selected = selected; - ds_free_packs(stream->ds); - demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL); + stream->ds->active = false; + ds_flush(stream->ds); + update = true; } + pthread_mutex_unlock(&demuxer->in->lock); + if (update) + demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL); } -void demuxer_enable_autoselect(struct demuxer *demuxer) +void demux_set_stream_autoselect(struct demuxer *demuxer, bool autoselect) { - demuxer->stream_autoselect = true; + assert(!demuxer->in->threading); // laziness + demuxer->in->autoselect = autoselect; } -bool demuxer_stream_is_selected(struct demuxer *d, struct sh_stream *stream) +bool demux_stream_is_selected(struct sh_stream *stream) { - return stream && stream->ds->selected; + if (!stream) + return false; + bool r = false; + pthread_mutex_lock(&stream->ds->in->lock); + r = stream->ds->selected; + pthread_mutex_unlock(&stream->ds->in->lock); + return r; } int demuxer_add_attachment(demuxer_t *demuxer, struct bstr name, @@ -878,7 +1043,7 @@ static int chapter_compare(const void *p1, const void *p2) return c1->original_index > c2->original_index ? 1 :-1; // never equal } -void demuxer_sort_chapters(demuxer_t *demuxer) +static void demuxer_sort_chapters(demuxer_t *demuxer) { qsort(demuxer->chapters, demuxer->num_chapters, sizeof(struct demux_chapter), chapter_compare); @@ -900,145 +1065,168 @@ int demuxer_add_chapter(demuxer_t *demuxer, struct bstr name, return demuxer->num_chapters - 1; } -static void add_stream_chapters(struct demuxer *demuxer) -{ - if (demuxer->num_chapters) - return; - int num_chapters = 0; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_NUM_CHAPTERS, - &num_chapters) != STREAM_OK) - return; - for (int n = 0; n < num_chapters; n++) { - double p = n; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_CHAPTER_TIME, &p) - != STREAM_OK) - return; - demuxer_add_chapter(demuxer, bstr0(""), p * 1e9, 0, 0); - } -} - double demuxer_get_time_length(struct demuxer *demuxer) { double len; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_TIME_LENGTH, &len) > 0) - return len; - // <= 0 means DEMUXER_CTRL_NOTIMPL or DEMUXER_CTRL_DONTKNOW if (demux_control(demuxer, DEMUXER_CTRL_GET_TIME_LENGTH, &len) > 0) return len; return -1; } -double demuxer_get_start_time(struct demuxer *demuxer) +// must be called locked +static void update_cache(struct demux_internal *in) { - double time; - if (stream_control(demuxer->stream, STREAM_CTRL_GET_START_TIME, &time) > 0) - return time; - if (demux_control(demuxer, DEMUXER_CTRL_GET_START_TIME, &time) > 0) - return time; - return 0; -} + struct demuxer *demuxer = in->d_thread; + struct stream *stream = demuxer->stream; -int demuxer_angles_count(demuxer_t *demuxer) -{ - int ris, angles = -1; + in->time_length = -1; + if (demuxer->desc->control) { + demuxer->desc->control(demuxer, DEMUXER_CTRL_GET_TIME_LENGTH, + &in->time_length); + } - ris = stream_control(demuxer->stream, STREAM_CTRL_GET_NUM_ANGLES, &angles); - if (ris == STREAM_UNSUPPORTED) - return -1; - return angles; + struct mp_tags *s_meta = NULL; + stream_control(stream, STREAM_CTRL_GET_METADATA, &s_meta); + if (s_meta) { + talloc_free(in->stream_metadata); + in->stream_metadata = talloc_steal(in, s_meta); + in->d_buffer->events |= DEMUX_EVENT_METADATA; + } + + in->stream_size = -1; + stream_control(stream, STREAM_CTRL_GET_SIZE, &in->stream_size); + in->stream_cache_size = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_SIZE, &in->stream_cache_size); + in->stream_cache_fill = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_FILL, &in->stream_cache_fill); + in->stream_cache_idle = -1; + stream_control(stream, STREAM_CTRL_GET_CACHE_IDLE, &in->stream_cache_idle); } -int demuxer_get_current_angle(demuxer_t *demuxer) +// must be called locked +static int cached_stream_control(struct demux_internal *in, int cmd, void *arg) { - int ris, curr_angle = -1; - ris = stream_control(demuxer->stream, STREAM_CTRL_GET_ANGLE, &curr_angle); - if (ris == STREAM_UNSUPPORTED) - return -1; - return curr_angle; + switch (cmd) { + case STREAM_CTRL_GET_CACHE_SIZE: + if (in->stream_cache_size < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_cache_size; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_FILL: + if (in->stream_cache_fill < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_cache_fill; + return STREAM_OK; + case STREAM_CTRL_GET_CACHE_IDLE: + if (in->stream_cache_idle < 0) + return STREAM_UNSUPPORTED; + *(int *)arg = in->stream_cache_idle; + return STREAM_OK; + case STREAM_CTRL_GET_SIZE: + if (in->stream_size < 0) + return STREAM_UNSUPPORTED; + *(int64_t *)arg = in->stream_size; + return STREAM_OK; + } + return STREAM_ERROR; } - -int demuxer_set_angle(demuxer_t *demuxer, int angle) +// must be called locked +static int cached_demux_control(struct demux_internal *in, int cmd, void *arg) { - int ris, angles = -1; - - angles = demuxer_angles_count(demuxer); - if ((angles < 1) || (angle > angles)) - return -1; - - demux_flush(demuxer); - - ris = stream_control(demuxer->stream, STREAM_CTRL_SET_ANGLE, &angle); - if (ris == STREAM_UNSUPPORTED) - return -1; - - demux_control(demuxer, DEMUXER_CTRL_RESYNC, NULL); - - return angle; + switch (cmd) { + case DEMUXER_CTRL_GET_TIME_LENGTH: + if (in->time_length < 0) + return DEMUXER_CTRL_NOTIMPL; + *(double *)arg = in->time_length; + return DEMUXER_CTRL_OK; + case DEMUXER_CTRL_STREAM_CTRL: { + struct demux_ctrl_stream_ctrl *c = arg; + int r = cached_stream_control(in, c->ctrl, c->arg); + if (r == STREAM_ERROR) + break; + c->res = r; + return DEMUXER_CTRL_OK; + } + case DEMUXER_CTRL_SWITCHED_TRACKS: + in->tracks_switched = true; + return DEMUXER_CTRL_OK; + } + return DEMUXER_CTRL_DONTKNOW; } -static int packet_sort_compare(const void *p1, const void *p2) +int demux_control(demuxer_t *demuxer, int cmd, void *arg) { - struct demux_packet *c1 = *(struct demux_packet **)p1; - struct demux_packet *c2 = *(struct demux_packet **)p2; + struct demux_internal *in = demuxer->in; + + if (in->threading) { + pthread_mutex_lock(&in->lock); + pthread_cond_signal(&in->wakeup); + int cr = cached_demux_control(in, cmd, arg); + pthread_mutex_unlock(&in->lock); + if (cr != DEMUXER_CTRL_DONTKNOW) + return cr; + } - if (c1->pts > c2->pts) - return 1; - else if (c1->pts < c2->pts) - return -1; - return 0; + int r = DEMUXER_CTRL_NOTIMPL; + demux_pause(demuxer); + if (cmd == DEMUXER_CTRL_STREAM_CTRL) { + struct demux_ctrl_stream_ctrl *c = arg; + if (in->threading) + MP_VERBOSE(demuxer, "blocking for STREAM_CTRL %d\n", c->ctrl); + c->res = stream_control(demuxer->stream, c->ctrl, c->arg); + if (c->res != STREAM_UNSUPPORTED) + r = DEMUXER_CTRL_OK; + } + if (r != DEMUXER_CTRL_OK) { + if (in->threading) + MP_VERBOSE(demuxer, "blocking for DEMUXER_CTRL %d\n", cmd); + if (demuxer->desc->control) + r = demuxer->desc->control(demuxer->in->d_thread, cmd, arg); + } + demux_unpause(demuxer); + return r; } -void demux_packet_list_sort(struct demux_packet **pkts, int num_pkts) +int demux_stream_control(demuxer_t *demuxer, int ctrl, void *arg) { - qsort(pkts, num_pkts, sizeof(struct demux_packet *), packet_sort_compare); + struct demux_ctrl_stream_ctrl c = {ctrl, arg, STREAM_UNSUPPORTED}; + demux_control(demuxer, DEMUXER_CTRL_STREAM_CTRL, &c); + return c.res; } -void demux_packet_list_seek(struct demux_packet **pkts, int num_pkts, - int *current, float rel_seek_secs, int flags) +// Make the demuxer thread stop doing anything. +// demux_unpause() wakes up the thread again. +// Can be nested with other calls, but trying to read packets may deadlock. +void demux_pause(demuxer_t *demuxer) { - double ref_time = 0; - if (*current >= 0 && *current < num_pkts) { - ref_time = pkts[*current]->pts; - } else if (*current == num_pkts && num_pkts > 0) { - ref_time = pkts[num_pkts - 1]->pts + pkts[num_pkts - 1]->duration; - } + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); - if (flags & SEEK_ABSOLUTE) - ref_time = 0; + if (!in->threading) + return; - if (flags & SEEK_FACTOR) { - ref_time += demux_packet_list_duration(pkts, num_pkts) * rel_seek_secs; - } else { - ref_time += rel_seek_secs; - } + MP_VERBOSE(in, "pause demux thread\n"); - // Could do binary search, but it's probably not worth the complexity. - int last_index = 0; - for (int n = 0; n < num_pkts; n++) { - if (pkts[n]->pts > ref_time) - break; - last_index = n; - } - *current = last_index; + pthread_mutex_lock(&in->lock); + in->thread_request_pause++; + pthread_cond_signal(&in->wakeup); + while (!in->thread_paused) + pthread_cond_wait(&in->wakeup, &in->lock); + pthread_mutex_unlock(&in->lock); } -double demux_packet_list_duration(struct demux_packet **pkts, int num_pkts) +void demux_unpause(demuxer_t *demuxer) { - if (num_pkts > 0) - return pkts[num_pkts - 1]->pts + pkts[num_pkts - 1]->duration; - return 0; -} + struct demux_internal *in = demuxer->in; + assert(demuxer == in->d_user); -struct demux_packet *demux_packet_list_fill(struct demux_packet **pkts, - int num_pkts, int *current) -{ - if (*current < 0) - *current = 0; - if (*current >= num_pkts) - return NULL; - struct demux_packet *new = talloc(NULL, struct demux_packet); - *new = *pkts[*current]; - *current += 1; - return new; + if (!in->threading) + return; + + pthread_mutex_lock(&in->lock); + assert(in->thread_request_pause > 0); + in->thread_request_pause--; + pthread_cond_signal(&in->wakeup); + pthread_mutex_unlock(&in->lock); } |