summaryrefslogtreecommitdiff
path: root/demux/demux.c
diff options
context:
space:
mode:
Diffstat (limited to 'demux/demux.c')
-rw-r--r--demux/demux.c1138
1 files changed, 663 insertions, 475 deletions
diff --git a/demux/demux.c b/demux/demux.c
index 51a5f4d..56a147e 100644
--- a/demux/demux.c
+++ b/demux/demux.c
@@ -1,6 +1,4 @@
/*
- * DEMUXER v2.5
- *
* This file is part of MPlayer.
*
* MPlayer is free software; you can redistribute it and/or modify
@@ -17,13 +15,13 @@
* with MPlayer; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define DEMUX_PRIV(x) x
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <unistd.h>
+#include <pthread.h>
#include <math.h>
@@ -32,7 +30,6 @@
#include "config.h"
#include "options/options.h"
-#include "common/av_common.h"
#include "talloc.h"
#include "common/msg.h"
#include "common/global.h"
@@ -44,8 +41,6 @@
#include "audio/format.h"
-#include <libavcodec/avcodec.h>
-
// Demuxer list
extern const struct demuxer_desc demuxer_desc_edl;
extern const struct demuxer_desc demuxer_desc_cue;
@@ -58,12 +53,14 @@ extern const demuxer_desc_t demuxer_desc_lavf;
extern const demuxer_desc_t demuxer_desc_libass;
extern const demuxer_desc_t demuxer_desc_subreader;
extern const demuxer_desc_t demuxer_desc_playlist;
+extern const demuxer_desc_t demuxer_desc_disc;
/* Please do not add any new demuxers here. If you want to implement a new
* demuxer, add it to libavformat, except for wrappers around external
* libraries and demuxers requiring binary support. */
const demuxer_desc_t *const demuxer_list[] = {
+ &demuxer_desc_disc,
&demuxer_desc_edl,
&demuxer_desc_cue,
&demuxer_desc_rawaudio,
@@ -80,25 +77,76 @@ const demuxer_desc_t *const demuxer_list[] = {
&demuxer_desc_playlist,
// Pretty aggressive, so should be last.
&demuxer_desc_subreader,
- /* Please do not add any new demuxers here. If you want to implement a new
- * demuxer, add it to libavformat, except for wrappers around external
- * libraries and demuxers requiring binary support. */
NULL
};
+struct demux_internal {
+ struct mp_log *log;
+
+ // The demuxer runs potentially in another thread, so we keep two demuxer
+ // structs; the real demuxer can access the shadow struct only.
+ // Since demuxer and user threads both don't use locks, a third demuxer
+ // struct d_buffer is used to copy data between them in a synchronized way.
+ struct demuxer *d_thread; // accessed by demuxer impl. (producer)
+ struct demuxer *d_user; // accessed by player (consumer)
+ struct demuxer *d_buffer; // protected by lock; used to sync d_user/thread
+
+ // The lock protects the packet queues (struct demux_stream), d_buffer,
+ // and some minor fields like thread_paused.
+ pthread_mutex_t lock;
+ pthread_cond_t wakeup;
+ pthread_t thread;
+
+ // -- All the following fields are protected by lock.
+
+ bool thread_paused;
+ int thread_request_pause; // counter, if >0, make demuxer thread pause
+ bool thread_terminate;
+ bool threading;
+ void (*wakeup_cb)(void *ctx);
+ void *wakeup_cb_ctx;
+
+ bool warned_queue_overflow;
+ bool last_eof; // last actual global EOF status
+ bool eof; // whether we're in EOF state (reset for retry)
+ bool autoselect;
+ int min_packs;
+ int min_bytes;
+
+ bool tracks_switched; // thread needs to inform demuxer of this
+
+ bool seeking; // there's a seek queued
+ int seek_flags; // flags for next seek (if seeking==true)
+ double seek_pts;
+
+ // Cached state.
+ double time_length;
+ struct mp_tags *stream_metadata;
+ int64_t stream_size;
+ int64_t stream_cache_size;
+ int64_t stream_cache_fill;
+ int stream_cache_idle;
+};
+
struct demux_stream {
- int selected; // user wants packets from this stream
- int eof; // end of demuxed stream? (true if all buffer empty)
- int packs; // number of packets in buffer
- int bytes; // total bytes of packets in buffer
+ struct demux_internal *in;
+ enum stream_type type;
+ // all fields are protected by in->lock
+ bool selected; // user wants packets from this stream
+ bool active; // try to keep at least 1 packet queued
+ bool eof; // end of demuxed stream? (true if all buffer empty)
+ size_t packs; // number of packets in buffer
+ size_t bytes; // total bytes of packets in buffer
struct demux_packet *head;
struct demux_packet *tail;
};
-static void add_stream_chapters(struct demuxer *demuxer);
-void demuxer_sort_chapters(demuxer_t *demuxer);
+static void demuxer_sort_chapters(demuxer_t *demuxer);
+static void *demux_thread(void *pctx);
+static void update_cache(struct demux_internal *in);
-static void ds_free_packs(struct demux_stream *ds)
+// called locked
+static void ds_flush(struct demux_stream *ds)
{
demux_packet_t *dp = ds->head;
while (dp) {
@@ -107,110 +155,16 @@ static void ds_free_packs(struct demux_stream *ds)
dp = dn;
}
ds->head = ds->tail = NULL;
- ds->packs = 0; // !!!!!
+ ds->packs = 0;
ds->bytes = 0;
- ds->eof = 0;
-}
-
-static void packet_destroy(void *ptr)
-{
- struct demux_packet *dp = ptr;
- talloc_free(dp->avpacket);
- av_free(dp->allocation);
-}
-
-static struct demux_packet *create_packet(size_t len)
-{
- if (len > 1000000000) {
- fprintf(stderr, "Attempt to allocate demux packet over 1 GB!\n");
- abort();
- }
- struct demux_packet *dp = talloc(NULL, struct demux_packet);
- talloc_set_destructor(dp, packet_destroy);
- *dp = (struct demux_packet) {
- .len = len,
- .pts = MP_NOPTS_VALUE,
- .dts = MP_NOPTS_VALUE,
- .duration = -1,
- .stream_pts = MP_NOPTS_VALUE,
- .pos = -1,
- .stream = -1,
- };
- return dp;
-}
-
-struct demux_packet *new_demux_packet(size_t len)
-{
- struct demux_packet *dp = create_packet(len);
- dp->buffer = av_malloc(len + FF_INPUT_BUFFER_PADDING_SIZE);
- if (!dp->buffer) {
- fprintf(stderr, "Memory allocation failure!\n");
- abort();
- }
- memset(dp->buffer + len, 0, FF_INPUT_BUFFER_PADDING_SIZE);
- dp->allocation = dp->buffer;
- return dp;
-}
-
-// data must already have suitable padding, and does not copy the data
-struct demux_packet *new_demux_packet_fromdata(void *data, size_t len)
-{
- struct demux_packet *dp = create_packet(len);
- dp->buffer = data;
- return dp;
-}
-
-struct demux_packet *new_demux_packet_from(void *data, size_t len)
-{
- struct demux_packet *dp = new_demux_packet(len);
- memcpy(dp->buffer, data, len);
- return dp;
-}
-
-void demux_packet_shorten(struct demux_packet *dp, size_t len)
-{
- assert(len <= dp->len);
- dp->len = len;
- memset(dp->buffer + dp->len, 0, FF_INPUT_BUFFER_PADDING_SIZE);
-}
-
-void free_demux_packet(struct demux_packet *dp)
-{
- talloc_free(dp);
-}
-
-static void destroy_avpacket(void *pkt)
-{
- av_free_packet(pkt);
-}
-
-struct demux_packet *demux_copy_packet(struct demux_packet *dp)
-{
- struct demux_packet *new = NULL;
- if (dp->avpacket) {
- assert(dp->buffer == dp->avpacket->data);
- assert(dp->len == dp->avpacket->size);
- AVPacket *newavp = talloc_zero(NULL, AVPacket);
- talloc_set_destructor(newavp, destroy_avpacket);
- av_init_packet(newavp);
- if (av_packet_ref(newavp, dp->avpacket) < 0)
- abort();
- new = new_demux_packet_fromdata(newavp->data, newavp->size);
- new->avpacket = newavp;
- }
- if (!new) {
- new = new_demux_packet(dp->len);
- memcpy(new->buffer, dp->buffer, new->len);
- }
- new->pts = dp->pts;
- new->dts = dp->dts;
- new->duration = dp->duration;
- new->stream_pts = dp->stream_pts;
- return new;
+ ds->eof = false;
+ ds->active = false;
}
struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type)
{
+ assert(demuxer == demuxer->in->d_thread);
+
if (demuxer->num_streams > MAX_SH_STREAMS) {
MP_WARN(demuxer, "Too many streams.\n");
return NULL;
@@ -225,33 +179,22 @@ struct sh_stream *new_sh_stream(demuxer_t *demuxer, enum stream_type type)
struct sh_stream *sh = talloc_ptrtype(demuxer, sh);
*sh = (struct sh_stream) {
.type = type,
- .demuxer = demuxer,
.index = demuxer->num_streams,
.demuxer_id = demuxer_id, // may be overwritten by demuxer
- .ds = talloc_zero(sh, struct demux_stream),
+ .ds = talloc(sh, struct demux_stream),
+ };
+ *sh->ds = (struct demux_stream) {
+ .in = demuxer->in,
+ .type = sh->type,
+ .selected = demuxer->in->autoselect,
};
MP_TARRAY_APPEND(demuxer, demuxer->streams, demuxer->num_streams, sh);
switch (sh->type) {
- case STREAM_VIDEO: {
- struct sh_video *sht = talloc_zero(demuxer, struct sh_video);
- sh->video = sht;
- break;
- }
- case STREAM_AUDIO: {
- struct sh_audio *sht = talloc_zero(demuxer, struct sh_audio);
- sh->audio = sht;
- break;
- }
- case STREAM_SUB: {
- struct sh_sub *sht = talloc_zero(demuxer, struct sh_sub);
- sh->sub = sht;
- break;
- }
- default: assert(false);
+ case STREAM_VIDEO: sh->video = talloc_zero(demuxer, struct sh_video); break;
+ case STREAM_AUDIO: sh->audio = talloc_zero(demuxer, struct sh_audio); break;
+ case STREAM_SUB: sh->sub = talloc_zero(demuxer, struct sh_sub); break;
}
- sh->ds->selected = demuxer->stream_autoselect;
-
return sh;
}
@@ -259,56 +202,83 @@ void free_demuxer(demuxer_t *demuxer)
{
if (!demuxer)
return;
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ demux_stop_thread(demuxer);
+
if (demuxer->desc->close)
- demuxer->desc->close(demuxer);
- // free streams:
+ demuxer->desc->close(in->d_thread);
for (int n = 0; n < demuxer->num_streams; n++)
- ds_free_packs(demuxer->streams[n]->ds);
+ ds_flush(demuxer->streams[n]->ds);
+ pthread_mutex_destroy(&in->lock);
+ pthread_cond_destroy(&in->wakeup);
talloc_free(demuxer);
}
-const char *stream_type_name(enum stream_type type)
+// Start the demuxer thread, which reads ahead packets on its own.
+void demux_start_thread(struct demuxer *demuxer)
{
- switch (type) {
- case STREAM_VIDEO: return "video";
- case STREAM_AUDIO: return "audio";
- case STREAM_SUB: return "sub";
- default: return "unknown";
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ if (!in->threading) {
+ in->threading = true;
+ if (pthread_create(&in->thread, NULL, demux_thread, in))
+ in->threading = false;
}
}
-static int count_packs(struct demuxer *demux, enum stream_type type)
+void demux_stop_thread(struct demuxer *demuxer)
{
- int c = 0;
- for (int n = 0; n < demux->num_streams; n++)
- c += demux->streams[n]->type == type ? demux->streams[n]->ds->packs : 0;
- return c;
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
+ if (in->threading) {
+ pthread_mutex_lock(&in->lock);
+ in->thread_terminate = true;
+ pthread_cond_signal(&in->wakeup);
+ pthread_mutex_unlock(&in->lock);
+ pthread_join(in->thread, NULL);
+ in->threading = false;
+ in->thread_terminate = false;
+ }
}
-static int count_bytes(struct demuxer *demux, enum stream_type type)
+// The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached.
+void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx)
+{
+ struct demux_internal *in = demuxer->in;
+ pthread_mutex_lock(&in->lock);
+ in->wakeup_cb = cb;
+ in->wakeup_cb_ctx = ctx;
+ pthread_mutex_unlock(&in->lock);
+}
+
+const char *stream_type_name(enum stream_type type)
{
- int c = 0;
- for (int n = 0; n < demux->num_streams; n++)
- c += demux->streams[n]->type == type ? demux->streams[n]->ds->bytes : 0;
- return c;
+ switch (type) {
+ case STREAM_VIDEO: return "video";
+ case STREAM_AUDIO: return "audio";
+ case STREAM_SUB: return "sub";
+ default: return "unknown";
+ }
}
// Returns the same value as demuxer->fill_buffer: 1 ok, 0 EOF/not selected.
-int demuxer_add_packet(demuxer_t *demuxer, struct sh_stream *stream,
- demux_packet_t *dp)
+int demux_add_packet(struct sh_stream *stream, demux_packet_t *dp)
{
struct demux_stream *ds = stream ? stream->ds : NULL;
- if (!dp || !ds || !ds->selected) {
+ if (!dp || !ds) {
talloc_free(dp);
return 0;
}
-
- if (stream->type == STREAM_VIDEO && !dp->len) {
- /* Video packets with size 0 are assumed to not correspond to frames,
- * but to indicate the absence of a frame in formats like AVI
- * that must have packets at fixed timestamp intervals. */
+ struct demux_internal *in = ds->in;
+ pthread_mutex_lock(&in->lock);
+ if (!ds->selected || in->seeking) {
+ pthread_mutex_unlock(&in->lock);
talloc_free(dp);
- return 1;
+ return 0;
}
dp->stream = stream->index;
@@ -324,82 +294,192 @@ int demuxer_add_packet(demuxer_t *demuxer, struct sh_stream *stream,
// first packet in stream
ds->head = ds->tail = dp;
}
- /* ds_get_packets() can set ds->eof to 1 when another stream runs out of
- * buffer space. That makes sense because in that situation the calling
- * code should not count on being able to demux more packets from this
- * stream. (Can happen with e.g. badly interleaved files.)
- * In this case, we didn't necessarily reach EOF, and new packet can
- * appear. */
- ds->eof = 0;
-
- if (dp->pos >= 0)
- demuxer->filepos = dp->pos;
+ // obviously not true anymore
+ ds->eof = false;
+ in->last_eof = in->eof = false;
// For video, PTS determination is not trivial, but for other media types
// distinguishing PTS and DTS is not useful.
if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE)
dp->pts = dp->dts;
- MP_DBG(demuxer, "DEMUX: Append packet to %s, len=%d pts=%5.3f pos=%"PRIi64" "
- "[packs: A=%d V=%d S=%d]\n", stream_type_name(stream->type),
- dp->len, dp->pts, dp->pos, count_packs(demuxer, STREAM_AUDIO),
- count_packs(demuxer, STREAM_VIDEO), count_packs(demuxer, STREAM_SUB));
+ MP_DBG(in, "append packet to %s: size=%d pts=%f dts=%f pos=%"PRIi64" "
+ "[num=%zd size=%zd]\n", stream_type_name(stream->type),
+ dp->len, dp->pts, dp->dts, dp->pos, ds->packs, ds->bytes);
+
+ if (ds->in->wakeup_cb)
+ ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
+ pthread_cond_signal(&in->wakeup);
+ pthread_mutex_unlock(&in->lock);
return 1;
}
-static bool demux_check_queue_full(demuxer_t *demux)
+// Returns true if there was "progress" (lock was released temporarily).
+static bool read_packet(struct demux_internal *in)
{
- for (int n = 0; n < demux->num_streams; n++) {
- struct sh_stream *sh = demux->streams[n];
- if (sh->ds->packs > MAX_PACKS || sh->ds->bytes > MAX_PACK_BYTES)
- goto overflow;
+ in->eof = false;
+
+ // Check if we need to read a new packet. We do this if all queues are below
+ // the minimum, or if a stream explicitly needs new packets. Also includes
+ // safe-guards against packet queue overflow.
+ bool active = false, read_more = false;
+ size_t packs = 0, bytes = 0;
+ for (int n = 0; n < in->d_buffer->num_streams; n++) {
+ struct demux_stream *ds = in->d_buffer->streams[n]->ds;
+ active |= ds->selected && ds->active;
+ read_more |= ds->active && !ds->head;
+ packs += ds->packs;
+ bytes += ds->bytes;
+ }
+ MP_DBG(in, "packets=%zd, bytes=%zd, active=%d, more=%d\n",
+ packs, bytes, active, read_more);
+ if (packs >= MAX_PACKS || bytes >= MAX_PACK_BYTES) {
+ if (!in->warned_queue_overflow) {
+ in->warned_queue_overflow = true;
+ MP_ERR(in, "Too many packets in the demuxer packet queues:\n");
+ for (int n = 0; n < in->d_buffer->num_streams; n++) {
+ struct demux_stream *ds = in->d_buffer->streams[n]->ds;
+ if (ds->selected) {
+ MP_ERR(in, " %s/%d: %zd packets, %zd bytes\n",
+ stream_type_name(ds->type), n, ds->packs, ds->bytes);
+ }
+ }
+ }
+ for (int n = 0; n < in->d_buffer->num_streams; n++) {
+ struct demux_stream *ds = in->d_buffer->streams[n]->ds;
+ ds->eof |= !ds->head;
+ }
+ pthread_cond_signal(&in->wakeup);
+ return false;
}
- return false;
-
-overflow:
-
- if (!demux->warned_queue_overflow) {
- MP_ERR(demux, "Too many packets in the demuxer "
- "packet queue (video: %d packets in %d bytes, audio: %d "
- "packets in %d bytes, sub: %d packets in %d bytes).\n",
- count_packs(demux, STREAM_VIDEO), count_bytes(demux, STREAM_VIDEO),
- count_packs(demux, STREAM_AUDIO), count_bytes(demux, STREAM_AUDIO),
- count_packs(demux, STREAM_SUB), count_bytes(demux, STREAM_SUB));
- MP_INFO(demux, "Maybe you are playing a non-"
- "interleaved stream/file or the codec failed?\n");
+ if (packs < in->min_packs && bytes < in->min_bytes)
+ read_more |= active;
+
+ if (!read_more)
+ return false;
+
+ // Actually read a packet. Drop the lock while doing so, because waiting
+ // for disk or network I/O can take time.
+ pthread_mutex_unlock(&in->lock);
+ struct demuxer *demux = in->d_thread;
+ bool eof = !demux->desc->fill_buffer || demux->desc->fill_buffer(demux) <= 0;
+ pthread_mutex_lock(&in->lock);
+
+ update_cache(in);
+
+ if (eof) {
+ for (int n = 0; n < in->d_buffer->num_streams; n++) {
+ struct demux_stream *ds = in->d_buffer->streams[n]->ds;
+ ds->eof = true;
+ ds->active = false;
+ }
+ // If we had EOF previously, then don't wakeup (avoids wakeup loop)
+ if (!in->last_eof) {
+ if (in->wakeup_cb)
+ in->wakeup_cb(in->wakeup_cb_ctx);
+ pthread_cond_signal(&in->wakeup);
+ MP_VERBOSE(in, "EOF reached.\n");
+ }
}
- demux->warned_queue_overflow = true;
+ in->eof = in->last_eof = eof;
return true;
}
-// return value:
-// 0 = EOF or no stream found or invalid type
-// 1 = successfully read a packet
+// must be called locked; may temporarily unlock
+static void ds_get_packets(struct demux_stream *ds)
+{
+ const char *t = stream_type_name(ds->type);
+ struct demux_internal *in = ds->in;
+ MP_DBG(in, "reading packet for %s\n", t);
+ in->eof = false; // force retry
+ ds->eof = false;
+ while (ds->selected && !ds->head && !ds->eof) {
+ ds->active = true;
+ // Note: the following code marks EOF if it can't continue
+ if (in->threading) {
+ MP_VERBOSE(in, "waiting for demux thread (%s)\n", t);
+ pthread_cond_signal(&in->wakeup);
+ pthread_cond_wait(&in->wakeup, &in->lock);
+ } else {
+ read_packet(in);
+ }
+ }
+}
-static int demux_fill_buffer(demuxer_t *demux)
+static void execute_trackswitch(struct demux_internal *in)
{
- return demux->desc->fill_buffer ? demux->desc->fill_buffer(demux) : 0;
+ in->tracks_switched = false;
+
+ pthread_mutex_unlock(&in->lock);
+
+ if (in->d_thread->desc->control)
+ in->d_thread->desc->control(in->d_thread, DEMUXER_CTRL_SWITCHED_TRACKS, 0);
+
+ pthread_mutex_lock(&in->lock);
}
-static void ds_get_packets(struct sh_stream *sh)
+static void execute_seek(struct demux_internal *in)
{
- struct demux_stream *ds = sh->ds;
- demuxer_t *demux = sh->demuxer;
- MP_TRACE(demux, "ds_get_packets (%s) called\n",
- stream_type_name(sh->type));
- while (1) {
- if (ds->head)
- return;
-
- if (demux_check_queue_full(demux))
- break;
+ int flags = in->seek_flags;
+ double pts = in->seek_pts;
+ in->seeking = false;
+
+ pthread_mutex_unlock(&in->lock);
+
+ if (in->d_thread->desc->seek)
+ in->d_thread->desc->seek(in->d_thread, pts, flags);
- if (!demux_fill_buffer(demux))
- break; // EOF
+ pthread_mutex_lock(&in->lock);
+}
+
+static void *demux_thread(void *pctx)
+{
+ struct demux_internal *in = pctx;
+ pthread_mutex_lock(&in->lock);
+ while (!in->thread_terminate) {
+ in->thread_paused = in->thread_request_pause > 0;
+ if (in->thread_paused) {
+ pthread_cond_signal(&in->wakeup);
+ pthread_cond_wait(&in->wakeup, &in->lock);
+ continue;
+ }
+ if (in->tracks_switched) {
+ execute_trackswitch(in);
+ continue;
+ }
+ if (in->seeking) {
+ execute_seek(in);
+ continue;
+ }
+ if (!in->eof) {
+ if (read_packet(in))
+ continue; // read_packet unlocked, so recheck conditions
+ }
+ update_cache(in);
+ pthread_cond_signal(&in->wakeup);
+ pthread_cond_wait(&in->wakeup, &in->lock);
}
- MP_VERBOSE(demux, "ds_get_packets: EOF reached (stream: %s)\n",
- stream_type_name(sh->type));
- ds->eof = 1;
+ pthread_mutex_unlock(&in->lock);
+ return NULL;
+}
+
+static struct demux_packet *dequeue_packet(struct demux_stream *ds)
+{
+ if (!ds->head)
+ return NULL;
+ struct demux_packet *pkt = ds->head;
+ ds->head = pkt->next;
+ pkt->next = NULL;
+ if (!ds->head)
+ ds->tail = NULL;
+ ds->bytes -= pkt->len;
+ ds->packs--;
+
+ // This implies this function is actually called from "the" user thread.
+ if (pkt && pkt->pos >= 0)
+ ds->in->d_user->filepos = pkt->pos;
+
+ return pkt;
}
// Read a packet from the given stream. The returned packet belongs to the
@@ -408,24 +488,46 @@ static void ds_get_packets(struct sh_stream *sh)
struct demux_packet *demux_read_packet(struct sh_stream *sh)
{
struct demux_stream *ds = sh ? sh->ds : NULL;
+ struct demux_packet *pkt = NULL;
+ if (ds) {
+ pthread_mutex_lock(&ds->in->lock);
+ ds_get_packets(ds);
+ pkt = dequeue_packet(ds);
+ pthread_cond_signal(&ds->in->wakeup); // possibly read more
+ pthread_mutex_unlock(&ds->in->lock);
+ }
+ return pkt;
+}
+
+// Poll the demuxer queue, and if there's a packet, return it. Otherwise, just
+// make the demuxer thread read packets for this stream, and if there's at
+// least one packet, call the wakeup callback.
+// Unlike demux_read_packet(), this always enables readahead (which means you
+// must not use it on interleaved subtitle streams).
+// Returns:
+// < 0: EOF was reached, *out_pkt=NULL
+// == 0: no new packet yet, but maybe later, *out_pkt=NULL
+// > 0: new packet read, *out_pkt is set
+int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
+{
+ struct demux_stream *ds = sh ? sh->ds : NULL;
+ int r = -1;
+ *out_pkt = NULL;
if (ds) {
- ds_get_packets(sh);
- struct demux_packet *pkt = ds->head;
- if (pkt) {
- ds->head = pkt->next;
- pkt->next = NULL;
- if (!ds->head)
- ds->tail = NULL;
- ds->bytes -= pkt->len;
- ds->packs--;
-
- if (pkt->stream_pts != MP_NOPTS_VALUE)
- sh->demuxer->stream_pts = pkt->stream_pts;
-
- return pkt;
+ if (ds->in->threading) {
+ pthread_mutex_lock(&ds->in->lock);
+ *out_pkt = dequeue_packet(ds);
+ r = *out_pkt ? 1 : (ds->eof ? -1 : 0);
+ ds->active = ds->selected; // enable readahead
+ ds->in->eof = false; // force retry
+ pthread_cond_signal(&ds->in->wakeup); // possibly read more
+ pthread_mutex_unlock(&ds->in->lock);
+ } else {
+ *out_pkt = demux_read_packet(sh);
+ r = *out_pkt ? 1 : -1;
}
}
- return NULL;
+ return r;
}
// Return the pts of the next packet that demux_read_packet() would return.
@@ -433,31 +535,61 @@ struct demux_packet *demux_read_packet(struct sh_stream *sh)
// packets from the queue.
double demux_get_next_pts(struct sh_stream *sh)
{
- if (sh && sh->ds->selected) {
- ds_get_packets(sh);
+ double res = MP_NOPTS_VALUE;
+ if (sh) {
+ pthread_mutex_lock(&sh->ds->in->lock);
+ ds_get_packets(sh->ds);
if (sh->ds->head)
- return sh->ds->head->pts;
+ res = sh->ds->head->pts;
+ pthread_mutex_unlock(&sh->ds->in->lock);
}
- return MP_NOPTS_VALUE;
+ return res;
}
// Return whether a packet is queued. Never blocks, never forces any reads.
bool demux_has_packet(struct sh_stream *sh)
{
- return sh && sh->ds->head;
+ bool has_packet = false;
+ if (sh) {
+ pthread_mutex_lock(&sh->ds->in->lock);
+ has_packet = sh->ds->head;
+ pthread_mutex_unlock(&sh->ds->in->lock);
+ }
+ return has_packet;
}
-// Same as demux_has_packet, but to be called internally by demuxers, as
-// opposed to the user of the demuxer.
-bool demuxer_stream_has_packets_queued(struct demuxer *d, struct sh_stream *stream)
+// Return whether EOF was returned with an earlier packet read.
+bool demux_stream_eof(struct sh_stream *sh)
{
- return demux_has_packet(stream);
+ bool eof = false;
+ if (sh) {
+ pthread_mutex_lock(&sh->ds->in->lock);
+ eof = sh->ds->eof && !sh->ds->head;
+ pthread_mutex_unlock(&sh->ds->in->lock);
+ }
+ return eof;
}
-// Return whether EOF was returned with an earlier packet read.
-bool demux_stream_eof(struct sh_stream *sh)
+// Read and return any packet we find.
+struct demux_packet *demux_read_any_packet(struct demuxer *demuxer)
{
- return !sh || sh->ds->eof;
+ assert(!demuxer->in->threading); // doesn't work with threading
+ bool read_more = true;
+ while (read_more) {
+ for (int n = 0; n < demuxer->num_streams; n++) {
+ struct sh_stream *sh = demuxer->streams[n];
+ sh->ds->active = sh->ds->selected; // force read_packet() to read
+ struct demux_packet *pkt = dequeue_packet(sh->ds);
+ if (pkt)
+ return pkt;
+ }
+ // retry after calling this
+ pthread_mutex_lock(&demuxer->in->lock);
+ read_more = read_packet(demuxer->in);
+ read_more &= !demuxer->in->eof;
+ pthread_mutex_unlock(&demuxer->in->lock);
+ }
+ return NULL;
}
// ====================================================================
@@ -543,8 +675,7 @@ static void demux_export_replaygain(demuxer_t *demuxer)
{
float tg, tp, ag, ap;
- if (!demuxer->replaygain_data &&
- !decode_gain(demuxer, "REPLAYGAIN_TRACK_GAIN", &tg) &&
+ if (!decode_gain(demuxer, "REPLAYGAIN_TRACK_GAIN", &tg) &&
!decode_peak(demuxer, "REPLAYGAIN_TRACK_PEAK", &tp) &&
!decode_gain(demuxer, "REPLAYGAIN_ALBUM_GAIN", &ag) &&
!decode_peak(demuxer, "REPLAYGAIN_ALBUM_PEAK", &ap))
@@ -556,8 +687,88 @@ static void demux_export_replaygain(demuxer_t *demuxer)
rgain->album_gain = ag;
rgain->album_peak = ap;
- demuxer->replaygain_data = rgain;
+ for (int n = 0; n < demuxer->num_streams; n++) {
+ struct sh_stream *sh = demuxer->streams[n];
+ if (sh->audio && !sh->audio->replaygain_data)
+ sh->audio->replaygain_data = rgain;
+ }
+ }
+}
+
+// Copy all fields from src to dst, depending on event flags.
+static void demux_copy(struct demuxer *dst, struct demuxer *src)
+{
+ if (src->events & DEMUX_EVENT_INIT) {
+ // Note that we do as shallow copies as possible. We expect the date
+ // that is not-copied (only referenced) to be immutable.
+ // This implies e.g. that no chapters are added after initialization.
+ dst->chapters = src->chapters;
+ dst->num_chapters = src->num_chapters;
+ dst->editions = src->editions;
+ dst->num_editions = src->num_editions;
+ dst->edition = src->edition;
+ dst->attachments = src->attachments;
+ dst->num_attachments = src->num_attachments;
+ dst->matroska_data = src->matroska_data;
+ dst->file_contents = src->file_contents;
+ dst->playlist = src->playlist;
+ dst->seekable = src->seekable;
+ dst->filetype = src->filetype;
+ dst->ts_resets_possible = src->ts_resets_possible;
+ dst->start_time = src->start_time;
}
+ if (src->events & DEMUX_EVENT_STREAMS) {
+ // The stream structs themselves are immutable.
+ for (int n = dst->num_streams; n < src->num_streams; n++)
+ MP_TARRAY_APPEND(dst, dst->streams, dst->num_streams, src->streams[n]);
+ }
+ if (src->events & DEMUX_EVENT_METADATA) {
+ talloc_free(dst->metadata);
+ dst->metadata = mp_tags_dup(dst, src->metadata);
+ }
+ dst->events |= src->events;
+ src->events = 0;
+}
+
+// This is called by demuxer implementations if certain parameters change
+// at runtime.
+// events is one of DEMUX_EVENT_*
+// The code will copy the fields references by the events to the user-thread.
+void demux_changed(demuxer_t *demuxer, int events)
+{
+ assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only
+ struct demux_internal *in = demuxer->in;
+
+ demuxer->events |= events;
+
+ pthread_mutex_lock(&in->lock);
+
+ update_cache(in);
+
+ if (demuxer->events & DEMUX_EVENT_INIT)
+ demuxer_sort_chapters(demuxer);
+ if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS))
+ demux_export_replaygain(demuxer);
+
+ demux_copy(in->d_buffer, demuxer);
+
+ pthread_mutex_unlock(&in->lock);
+}
+
+// Called by the user thread (i.e. player) to update metadata and other things
+// from the demuxer thread.
+void demux_update(demuxer_t *demuxer)
+{
+ assert(demuxer == demuxer->in->d_user);
+ struct demux_internal *in = demuxer->in;
+
+ pthread_mutex_lock(&in->lock);
+ if (!in->threading)
+ update_cache(in);
+ demux_copy(demuxer, in->d_buffer);
+ if (in->stream_metadata && (demuxer->events & DEMUX_EVENT_METADATA))
+ mp_tags_merge(demuxer->metadata, in->stream_metadata);
+ pthread_mutex_unlock(&in->lock);
}
static struct demuxer *open_given_type(struct mpv_global *global,
@@ -572,54 +783,62 @@ static struct demuxer *open_given_type(struct mpv_global *global,
.desc = desc,
.type = desc->type,
.stream = stream,
- .stream_pts = MP_NOPTS_VALUE,
.seekable = stream->seekable,
- .accurate_seek = true,
.filepos = -1,
.opts = global->opts,
.global = global,
.log = mp_log_new(demuxer, log, desc->name),
.glog = log,
.filename = talloc_strdup(demuxer, stream->url),
- .metadata = talloc_zero(demuxer, struct mp_tags),
+ .events = DEMUX_EVENT_ALL,
};
demuxer->seekable = stream->seekable;
if (demuxer->stream->uncached_stream &&
!demuxer->stream->uncached_stream->seekable)
demuxer->seekable = false;
- demuxer->params = params; // temporary during open()
+ struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in);
+ *in = (struct demux_internal){
+ .log = demuxer->log,
+ .d_thread = talloc(demuxer, struct demuxer),
+ .d_buffer = talloc(demuxer, struct demuxer),
+ .d_user = demuxer,
+ .min_packs = demuxer->opts->demuxer_min_packs,
+ .min_bytes = demuxer->opts->demuxer_min_bytes,
+ };
+ pthread_mutex_init(&in->lock, NULL);
+ pthread_cond_init(&in->wakeup, NULL);
+
+ *in->d_thread = *demuxer;
+ *in->d_buffer = *demuxer;
+
+ in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags);
+ in->d_user->metadata = talloc_zero(in->d_user, struct mp_tags);
+ in->d_buffer->metadata = talloc_zero(in->d_buffer, struct mp_tags);
+
int64_t start_pos = stream_tell(stream);
mp_verbose(log, "Trying demuxer: %s (force-level: %s)\n",
desc->name, d_level(check));
- int ret = demuxer->desc->open(demuxer, check);
+ in->d_thread->params = params; // temporary during open()
+
+ int ret = demuxer->desc->open(in->d_thread, check);
if (ret >= 0) {
- demuxer->params = NULL;
- if (demuxer->filetype)
+ in->d_thread->params = NULL;
+ if (in->d_thread->filetype)
mp_verbose(log, "Detected file format: %s (%s)\n",
- demuxer->filetype, desc->desc);
+ in->d_thread->filetype, desc->desc);
else
mp_verbose(log, "Detected file format: %s\n", desc->desc);
- if (stream_manages_timeline(demuxer->stream)) {
- // Incorrect, but fixes some behavior with DVD/BD
- demuxer->ts_resets_possible = false;
- // Doesn't work, because stream_pts is a "guess".
- demuxer->accurate_seek = false;
- // Can be seekable even if the stream isn't.
- demuxer->seekable = true;
- }
- add_stream_chapters(demuxer);
- demuxer_sort_chapters(demuxer);
- demux_info_update(demuxer);
- demux_export_replaygain(demuxer);
// Pretend we can seek if we can't seek, but there's a cache.
- if (!demuxer->seekable && stream->uncached_stream) {
+ if (!in->d_thread->seekable && stream->uncached_stream) {
mp_warn(log,
"File is not seekable, but there's a cache: enabling seeking.\n");
- demuxer->seekable = true;
+ in->d_thread->seekable = true;
}
+ demux_changed(in->d_thread, DEMUX_EVENT_ALL);
+ demux_update(demuxer);
return demuxer;
}
@@ -685,15 +904,28 @@ done:
return demuxer;
}
-void demux_flush(demuxer_t *demuxer)
+static void flush_locked(demuxer_t *demuxer)
{
for (int n = 0; n < demuxer->num_streams; n++)
- ds_free_packs(demuxer->streams[n]->ds);
- demuxer->warned_queue_overflow = false;
+ ds_flush(demuxer->streams[n]->ds);
+ demuxer->in->warned_queue_overflow = false;
+ demuxer->in->eof = false;
+ demuxer->in->last_eof = false;
+}
+
+// clear the packet queues
+void demux_flush(demuxer_t *demuxer)
+{
+ pthread_mutex_lock(&demuxer->in->lock);
+ flush_locked(demuxer);
+ pthread_mutex_unlock(&demuxer->in->lock);
}
int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags)
{
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
+
if (!demuxer->seekable) {
MP_WARN(demuxer, "Cannot seek in this file.\n");
return 0;
@@ -702,107 +934,27 @@ int demux_seek(demuxer_t *demuxer, float rel_seek_secs, int flags)
if (rel_seek_secs == MP_NOPTS_VALUE && (flags & SEEK_ABSOLUTE))
return 0;
- // clear demux buffers:
- demux_flush(demuxer);
+ pthread_mutex_lock(&in->lock);
- /* Note: this is for DVD and BD playback. The stream layer has to do these
- * seeks, and the demuxer has to react to DEMUXER_CTRL_RESYNC in order to
- * deal with the suddenly changing stream position.
- */
- struct stream *stream = demuxer->stream;
- if (stream_manages_timeline(stream)) {
- double pts;
-
- if (flags & SEEK_ABSOLUTE)
- pts = 0.0f;
- else {
- if (demuxer->stream_pts == MP_NOPTS_VALUE)
- goto dmx_seek;
- pts = demuxer->stream_pts;
- }
+ flush_locked(demuxer);
+ in->seeking = true;
+ in->seek_flags = flags;
+ in->seek_pts = rel_seek_secs;
- if (flags & SEEK_FACTOR) {
- double tmp = 0;
- if (stream_control(demuxer->stream, STREAM_CTRL_GET_TIME_LENGTH,
- &tmp) == STREAM_UNSUPPORTED)
- goto dmx_seek;
- pts += tmp * rel_seek_secs;
- } else
- pts += rel_seek_secs;
-
- if (stream_control(demuxer->stream, STREAM_CTRL_SEEK_TO_TIME, &pts)
- != STREAM_UNSUPPORTED) {
- demux_control(demuxer, DEMUXER_CTRL_RESYNC, NULL);
- return 1;
- }
- }
+ if (!in->threading)
+ execute_seek(in);
- dmx_seek:
- if (demuxer->desc->seek)
- demuxer->desc->seek(demuxer, rel_seek_secs, flags);
+ pthread_cond_signal(&in->wakeup);
+ pthread_mutex_unlock(&in->lock);
return 1;
}
-static int demux_info_print(demuxer_t *demuxer)
-{
- struct mp_tags *info = demuxer->metadata;
- int n;
-
- if (!info || !info->num_keys)
- return 0;
-
- mp_info(demuxer->glog, "File tags:\n");
- for (n = 0; n < info->num_keys; n++) {
- mp_info(demuxer->glog, " %s: %s\n", info->keys[n], info->values[n]);
- }
-
- return 0;
-}
-
char *demux_info_get(demuxer_t *demuxer, const char *opt)
{
return mp_tags_get_str(demuxer->metadata, opt);
}
-bool demux_info_update(struct demuxer *demuxer)
-{
- struct mp_tags *tags = demuxer->metadata;
- // Take care of stream metadata as well
- char **meta;
- if (stream_control(demuxer->stream, STREAM_CTRL_GET_METADATA, &meta) > 0) {
- for (int n = 0; meta[n + 0]; n += 2)
- mp_tags_set_str(tags, meta[n + 0], meta[n + 1]);
- talloc_free(meta);
- }
- // Check for metadata changes the hard way.
- char *data = talloc_strdup(demuxer, "");
- for (int n = 0; n < tags->num_keys; n++) {
- data = talloc_asprintf_append_buffer(data, "%s=%s\n", tags->keys[n],
- tags->values[n]);
- }
- if (!demuxer->previous_metadata ||
- strcmp(demuxer->previous_metadata, data) != 0)
- {
- talloc_free(demuxer->previous_metadata);
- demuxer->previous_metadata = data;
- demux_info_print(demuxer);
- return true;
- } else {
- talloc_free(data);
- return false;
- }
-}
-
-int demux_control(demuxer_t *demuxer, int cmd, void *arg)
-{
-
- if (demuxer->desc->control)
- return demuxer->desc->control(demuxer, cmd, arg);
-
- return DEMUXER_CTRL_NOTIMPL;
-}
-
struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d,
enum stream_type t, int id)
{
@@ -830,21 +982,34 @@ void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream,
bool selected)
{
// don't flush buffers if stream is already selected / unselected
+ pthread_mutex_lock(&demuxer->in->lock);
+ bool update = false;
if (stream->ds->selected != selected) {
stream->ds->selected = selected;
- ds_free_packs(stream->ds);
- demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL);
+ stream->ds->active = false;
+ ds_flush(stream->ds);
+ update = true;
}
+ pthread_mutex_unlock(&demuxer->in->lock);
+ if (update)
+ demux_control(demuxer, DEMUXER_CTRL_SWITCHED_TRACKS, NULL);
}
-void demuxer_enable_autoselect(struct demuxer *demuxer)
+void demux_set_stream_autoselect(struct demuxer *demuxer, bool autoselect)
{
- demuxer->stream_autoselect = true;
+ assert(!demuxer->in->threading); // laziness
+ demuxer->in->autoselect = autoselect;
}
-bool demuxer_stream_is_selected(struct demuxer *d, struct sh_stream *stream)
+bool demux_stream_is_selected(struct sh_stream *stream)
{
- return stream && stream->ds->selected;
+ if (!stream)
+ return false;
+ bool r = false;
+ pthread_mutex_lock(&stream->ds->in->lock);
+ r = stream->ds->selected;
+ pthread_mutex_unlock(&stream->ds->in->lock);
+ return r;
}
int demuxer_add_attachment(demuxer_t *demuxer, struct bstr name,
@@ -878,7 +1043,7 @@ static int chapter_compare(const void *p1, const void *p2)
return c1->original_index > c2->original_index ? 1 :-1; // never equal
}
-void demuxer_sort_chapters(demuxer_t *demuxer)
+static void demuxer_sort_chapters(demuxer_t *demuxer)
{
qsort(demuxer->chapters, demuxer->num_chapters,
sizeof(struct demux_chapter), chapter_compare);
@@ -900,145 +1065,168 @@ int demuxer_add_chapter(demuxer_t *demuxer, struct bstr name,
return demuxer->num_chapters - 1;
}
-static void add_stream_chapters(struct demuxer *demuxer)
-{
- if (demuxer->num_chapters)
- return;
- int num_chapters = 0;
- if (stream_control(demuxer->stream, STREAM_CTRL_GET_NUM_CHAPTERS,
- &num_chapters) != STREAM_OK)
- return;
- for (int n = 0; n < num_chapters; n++) {
- double p = n;
- if (stream_control(demuxer->stream, STREAM_CTRL_GET_CHAPTER_TIME, &p)
- != STREAM_OK)
- return;
- demuxer_add_chapter(demuxer, bstr0(""), p * 1e9, 0, 0);
- }
-}
-
double demuxer_get_time_length(struct demuxer *demuxer)
{
double len;
- if (stream_control(demuxer->stream, STREAM_CTRL_GET_TIME_LENGTH, &len) > 0)
- return len;
- // <= 0 means DEMUXER_CTRL_NOTIMPL or DEMUXER_CTRL_DONTKNOW
if (demux_control(demuxer, DEMUXER_CTRL_GET_TIME_LENGTH, &len) > 0)
return len;
return -1;
}
-double demuxer_get_start_time(struct demuxer *demuxer)
+// must be called locked
+static void update_cache(struct demux_internal *in)
{
- double time;
- if (stream_control(demuxer->stream, STREAM_CTRL_GET_START_TIME, &time) > 0)
- return time;
- if (demux_control(demuxer, DEMUXER_CTRL_GET_START_TIME, &time) > 0)
- return time;
- return 0;
-}
+ struct demuxer *demuxer = in->d_thread;
+ struct stream *stream = demuxer->stream;
-int demuxer_angles_count(demuxer_t *demuxer)
-{
- int ris, angles = -1;
+ in->time_length = -1;
+ if (demuxer->desc->control) {
+ demuxer->desc->control(demuxer, DEMUXER_CTRL_GET_TIME_LENGTH,
+ &in->time_length);
+ }
- ris = stream_control(demuxer->stream, STREAM_CTRL_GET_NUM_ANGLES, &angles);
- if (ris == STREAM_UNSUPPORTED)
- return -1;
- return angles;
+ struct mp_tags *s_meta = NULL;
+ stream_control(stream, STREAM_CTRL_GET_METADATA, &s_meta);
+ if (s_meta) {
+ talloc_free(in->stream_metadata);
+ in->stream_metadata = talloc_steal(in, s_meta);
+ in->d_buffer->events |= DEMUX_EVENT_METADATA;
+ }
+
+ in->stream_size = -1;
+ stream_control(stream, STREAM_CTRL_GET_SIZE, &in->stream_size);
+ in->stream_cache_size = -1;
+ stream_control(stream, STREAM_CTRL_GET_CACHE_SIZE, &in->stream_cache_size);
+ in->stream_cache_fill = -1;
+ stream_control(stream, STREAM_CTRL_GET_CACHE_FILL, &in->stream_cache_fill);
+ in->stream_cache_idle = -1;
+ stream_control(stream, STREAM_CTRL_GET_CACHE_IDLE, &in->stream_cache_idle);
}
-int demuxer_get_current_angle(demuxer_t *demuxer)
+// must be called locked
+static int cached_stream_control(struct demux_internal *in, int cmd, void *arg)
{
- int ris, curr_angle = -1;
- ris = stream_control(demuxer->stream, STREAM_CTRL_GET_ANGLE, &curr_angle);
- if (ris == STREAM_UNSUPPORTED)
- return -1;
- return curr_angle;
+ switch (cmd) {
+ case STREAM_CTRL_GET_CACHE_SIZE:
+ if (in->stream_cache_size < 0)
+ return STREAM_UNSUPPORTED;
+ *(int64_t *)arg = in->stream_cache_size;
+ return STREAM_OK;
+ case STREAM_CTRL_GET_CACHE_FILL:
+ if (in->stream_cache_fill < 0)
+ return STREAM_UNSUPPORTED;
+ *(int64_t *)arg = in->stream_cache_fill;
+ return STREAM_OK;
+ case STREAM_CTRL_GET_CACHE_IDLE:
+ if (in->stream_cache_idle < 0)
+ return STREAM_UNSUPPORTED;
+ *(int *)arg = in->stream_cache_idle;
+ return STREAM_OK;
+ case STREAM_CTRL_GET_SIZE:
+ if (in->stream_size < 0)
+ return STREAM_UNSUPPORTED;
+ *(int64_t *)arg = in->stream_size;
+ return STREAM_OK;
+ }
+ return STREAM_ERROR;
}
-
-int demuxer_set_angle(demuxer_t *demuxer, int angle)
+// must be called locked
+static int cached_demux_control(struct demux_internal *in, int cmd, void *arg)
{
- int ris, angles = -1;
-
- angles = demuxer_angles_count(demuxer);
- if ((angles < 1) || (angle > angles))
- return -1;
-
- demux_flush(demuxer);
-
- ris = stream_control(demuxer->stream, STREAM_CTRL_SET_ANGLE, &angle);
- if (ris == STREAM_UNSUPPORTED)
- return -1;
-
- demux_control(demuxer, DEMUXER_CTRL_RESYNC, NULL);
-
- return angle;
+ switch (cmd) {
+ case DEMUXER_CTRL_GET_TIME_LENGTH:
+ if (in->time_length < 0)
+ return DEMUXER_CTRL_NOTIMPL;
+ *(double *)arg = in->time_length;
+ return DEMUXER_CTRL_OK;
+ case DEMUXER_CTRL_STREAM_CTRL: {
+ struct demux_ctrl_stream_ctrl *c = arg;
+ int r = cached_stream_control(in, c->ctrl, c->arg);
+ if (r == STREAM_ERROR)
+ break;
+ c->res = r;
+ return DEMUXER_CTRL_OK;
+ }
+ case DEMUXER_CTRL_SWITCHED_TRACKS:
+ in->tracks_switched = true;
+ return DEMUXER_CTRL_OK;
+ }
+ return DEMUXER_CTRL_DONTKNOW;
}
-static int packet_sort_compare(const void *p1, const void *p2)
+int demux_control(demuxer_t *demuxer, int cmd, void *arg)
{
- struct demux_packet *c1 = *(struct demux_packet **)p1;
- struct demux_packet *c2 = *(struct demux_packet **)p2;
+ struct demux_internal *in = demuxer->in;
+
+ if (in->threading) {
+ pthread_mutex_lock(&in->lock);
+ pthread_cond_signal(&in->wakeup);
+ int cr = cached_demux_control(in, cmd, arg);
+ pthread_mutex_unlock(&in->lock);
+ if (cr != DEMUXER_CTRL_DONTKNOW)
+ return cr;
+ }
- if (c1->pts > c2->pts)
- return 1;
- else if (c1->pts < c2->pts)
- return -1;
- return 0;
+ int r = DEMUXER_CTRL_NOTIMPL;
+ demux_pause(demuxer);
+ if (cmd == DEMUXER_CTRL_STREAM_CTRL) {
+ struct demux_ctrl_stream_ctrl *c = arg;
+ if (in->threading)
+ MP_VERBOSE(demuxer, "blocking for STREAM_CTRL %d\n", c->ctrl);
+ c->res = stream_control(demuxer->stream, c->ctrl, c->arg);
+ if (c->res != STREAM_UNSUPPORTED)
+ r = DEMUXER_CTRL_OK;
+ }
+ if (r != DEMUXER_CTRL_OK) {
+ if (in->threading)
+ MP_VERBOSE(demuxer, "blocking for DEMUXER_CTRL %d\n", cmd);
+ if (demuxer->desc->control)
+ r = demuxer->desc->control(demuxer->in->d_thread, cmd, arg);
+ }
+ demux_unpause(demuxer);
+ return r;
}
-void demux_packet_list_sort(struct demux_packet **pkts, int num_pkts)
+int demux_stream_control(demuxer_t *demuxer, int ctrl, void *arg)
{
- qsort(pkts, num_pkts, sizeof(struct demux_packet *), packet_sort_compare);
+ struct demux_ctrl_stream_ctrl c = {ctrl, arg, STREAM_UNSUPPORTED};
+ demux_control(demuxer, DEMUXER_CTRL_STREAM_CTRL, &c);
+ return c.res;
}
-void demux_packet_list_seek(struct demux_packet **pkts, int num_pkts,
- int *current, float rel_seek_secs, int flags)
+// Make the demuxer thread stop doing anything.
+// demux_unpause() wakes up the thread again.
+// Can be nested with other calls, but trying to read packets may deadlock.
+void demux_pause(demuxer_t *demuxer)
{
- double ref_time = 0;
- if (*current >= 0 && *current < num_pkts) {
- ref_time = pkts[*current]->pts;
- } else if (*current == num_pkts && num_pkts > 0) {
- ref_time = pkts[num_pkts - 1]->pts + pkts[num_pkts - 1]->duration;
- }
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
- if (flags & SEEK_ABSOLUTE)
- ref_time = 0;
+ if (!in->threading)
+ return;
- if (flags & SEEK_FACTOR) {
- ref_time += demux_packet_list_duration(pkts, num_pkts) * rel_seek_secs;
- } else {
- ref_time += rel_seek_secs;
- }
+ MP_VERBOSE(in, "pause demux thread\n");
- // Could do binary search, but it's probably not worth the complexity.
- int last_index = 0;
- for (int n = 0; n < num_pkts; n++) {
- if (pkts[n]->pts > ref_time)
- break;
- last_index = n;
- }
- *current = last_index;
+ pthread_mutex_lock(&in->lock);
+ in->thread_request_pause++;
+ pthread_cond_signal(&in->wakeup);
+ while (!in->thread_paused)
+ pthread_cond_wait(&in->wakeup, &in->lock);
+ pthread_mutex_unlock(&in->lock);
}
-double demux_packet_list_duration(struct demux_packet **pkts, int num_pkts)
+void demux_unpause(demuxer_t *demuxer)
{
- if (num_pkts > 0)
- return pkts[num_pkts - 1]->pts + pkts[num_pkts - 1]->duration;
- return 0;
-}
+ struct demux_internal *in = demuxer->in;
+ assert(demuxer == in->d_user);
-struct demux_packet *demux_packet_list_fill(struct demux_packet **pkts,
- int num_pkts, int *current)
-{
- if (*current < 0)
- *current = 0;
- if (*current >= num_pkts)
- return NULL;
- struct demux_packet *new = talloc(NULL, struct demux_packet);
- *new = *pkts[*current];
- *current += 1;
- return new;
+ if (!in->threading)
+ return;
+
+ pthread_mutex_lock(&in->lock);
+ assert(in->thread_request_pause > 0);
+ in->thread_request_pause--;
+ pthread_cond_signal(&in->wakeup);
+ pthread_mutex_unlock(&in->lock);
}