diff options
author | Alessandro Ghedini <alessandro@ghedini.me> | 2014-06-25 20:57:07 +0200 |
---|---|---|
committer | Alessandro Ghedini <alessandro@ghedini.me> | 2014-06-25 20:57:07 +0200 |
commit | a1af603a3309eea7728e11ce446a69b45c5a65c6 (patch) | |
tree | fd3a5f68c842ae2bb0fd1d4d42b9ee799b927900 /misc | |
parent | 1d44deb1c45b1a17cfb93649c516b6b5a725e039 (diff) |
Imported Upstream version 0.4.0
Diffstat (limited to 'misc')
-rw-r--r-- | misc/dispatch.c | 271 | ||||
-rw-r--r-- | misc/dispatch.h | 23 | ||||
-rw-r--r-- | misc/ring.c | 44 |
3 files changed, 312 insertions, 26 deletions
diff --git a/misc/dispatch.c b/misc/dispatch.c new file mode 100644 index 0000000..6391caa --- /dev/null +++ b/misc/dispatch.c @@ -0,0 +1,271 @@ +/* + * This file is part of mpv. + * + * mpv is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * mpv is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with mpv. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdbool.h> +#include <assert.h> + +#include "common/common.h" +#include "osdep/threads.h" +#include "osdep/timer.h" + +#include "dispatch.h" + +struct mp_dispatch_queue { + struct mp_dispatch_item *head, *tail; + pthread_mutex_t lock; + pthread_cond_t cond; + int suspend_requested; + bool suspended; + void (*wakeup_fn)(void *wakeup_ctx); + void *wakeup_ctx; + // This lock grant access to the target thread's state during suspend mode. + // During suspend mode, the target thread is blocked in the function + // mp_dispatch_queue_process(), however this function may be processing + // dispatch queue items. This lock serializes the dispatch queue processing + // and external mp_dispatch_lock() calls. + // Invariant: can be held only while suspended==true, and suspend_requested + // must be >0 (unless mp_dispatch_queue_process() locks it). In particular, + // suspend mode must not be left while the lock is held. + pthread_mutex_t exclusive_lock; +}; + +struct mp_dispatch_item { + mp_dispatch_fn fn; + void *fn_data; + bool asynchronous; + bool completed; + struct mp_dispatch_item *next; +}; + +static void queue_dtor(void *p) +{ + struct mp_dispatch_queue *queue = p; + assert(!queue->head); + assert(!queue->suspend_requested); + assert(!queue->suspended); + pthread_cond_destroy(&queue->cond); + pthread_mutex_destroy(&queue->lock); + pthread_mutex_destroy(&queue->exclusive_lock); +} + +// A dispatch queue lets other threads runs callbacks in a target thread. +// The target thread is the thread which created the queue and which calls +// mp_dispatch_queue_process(). +// Free the dispatch queue with talloc_free(). (It must be empty.) +struct mp_dispatch_queue *mp_dispatch_create(void *ta_parent) +{ + struct mp_dispatch_queue *queue = talloc_ptrtype(ta_parent, queue); + *queue = (struct mp_dispatch_queue){0}; + talloc_set_destructor(queue, queue_dtor); + pthread_mutex_init(&queue->exclusive_lock, NULL); + pthread_mutex_init(&queue->lock, NULL); + pthread_cond_init(&queue->cond, NULL); + return queue; +} + +// Set a custom function that should be called to guarantee that the target +// thread wakes up. This is intended for use with code that needs to block +// on non-pthread primitives, such as e.g. select(). In the case of select(), +// the wakeup_fn could for example write a byte into a "wakeup" pipe in order +// to unblock the select(). The wakeup_fn is called from the dispatch queue +// when there are new dispatch items, and the target thread should then enter +// mp_dispatch_queue_process() as soon as possible. Note that wakeup_fn is +// called under no lock, so you might have to do synchronization yourself. +void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, + void (*wakeup_fn)(void *wakeup_ctx), + void *wakeup_ctx) +{ + queue->wakeup_fn = wakeup_fn; + queue->wakeup_ctx = wakeup_ctx; +} + +static void mp_dispatch_append(struct mp_dispatch_queue *queue, + struct mp_dispatch_item *item) +{ + pthread_mutex_lock(&queue->lock); + if (queue->tail) { + queue->tail->next = item; + } else { + queue->head = item; + } + queue->tail = item; + // Wake up the main thread; note that other threads might wait on this + // condition for reasons, so broadcast the condition. + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); + if (queue->wakeup_fn) + queue->wakeup_fn(queue->wakeup_ctx); +} + +// Enqueue a callback to run it on the target thread asynchronously. The target +// thread will run fn(fn_data) as soon as it enter mp_dispatch_queue_process. +// Note that mp_dispatch_enqueue() will usually return long before that happens. +// It's up to the user to signal completion of the callback. It's also up to +// the user to guarantee that the context fn_data has correct lifetime, i.e. +// lives until the callback is run, and is freed after that. +void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = fn_data, + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Like mp_dispatch_enqueue(), but the queue code will call talloc_free(fn_data) +// after the fn callback has been run. (The callback could trivially do that +// itself, but it makes it easier to implement synchronous and asynchronous +// requests with the same callback implementation.) +void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item *item = talloc_ptrtype(NULL, item); + *item = (struct mp_dispatch_item){ + .fn = fn, + .fn_data = talloc_steal(item, fn_data), + .asynchronous = true, + }; + mp_dispatch_append(queue, item); +} + +// Run fn(fn_data) on the target thread synchronously. This function enqueues +// the callback and waits until the target thread is done doing this. +// This is redundant to calling the function inside mp_dispatch_[un]lock(), +// but can be helpful with code that relies on TLS (such as OpenGL). +void mp_dispatch_run(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data) +{ + struct mp_dispatch_item item = { + .fn = fn, + .fn_data = fn_data, + }; + mp_dispatch_append(queue, &item); + + pthread_mutex_lock(&queue->lock); + while (!item.completed) + pthread_cond_wait(&queue->cond, &queue->lock); + pthread_mutex_unlock(&queue->lock); +} + +// Process any outstanding dispatch items in the queue. This also handles +// suspending or locking the target thread. +// The timeout specifies the minimum wait time. The actual time spent in this +// function can be much higher if the suspending/locking functions are used, or +// if executing the dispatch items takes time. On the other hand, this function +// can return much earlier than the timeout due to sporadic wakeups. +// It is also guaranteed that if at least one queue item was processed, the +// function will return as soon as possible, ignoring the timeout. This +// simplifies users, such as re-checking conditions before waiting. (It will +// still process the remaining queue items, and wait for unsuspend.) +void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout) +{ + int64_t wait = timeout > 0 ? mp_add_timeout(mp_time_us(), timeout) : 0; + pthread_mutex_lock(&queue->lock); + queue->suspended = true; + // Wake up thread which called mp_dispatch_suspend(). + pthread_cond_broadcast(&queue->cond); + while (queue->head || queue->suspend_requested || wait > 0) { + if (queue->head) { + struct mp_dispatch_item *item = queue->head; + queue->head = item->next; + if (!queue->head) + queue->tail = NULL; + item->next = NULL; + // Unlock, because we want to allow other threads to queue items + // while the dispatch item is processed. + // At the same time, exclusive_lock must be held to protect the + // thread's user state. + pthread_mutex_unlock(&queue->lock); + pthread_mutex_lock(&queue->exclusive_lock); + item->fn(item->fn_data); + pthread_mutex_unlock(&queue->exclusive_lock); + pthread_mutex_lock(&queue->lock); + if (item->asynchronous) { + talloc_free(item); + } else { + item->completed = true; + // Wakeup mp_dispatch_run() + pthread_cond_broadcast(&queue->cond); + } + } else { + if (wait > 0) { + mpthread_cond_timedwait(&queue->cond, &queue->lock, wait); + } else { + pthread_cond_wait(&queue->cond, &queue->lock); + } + } + wait = 0; + } + queue->suspended = false; + pthread_mutex_unlock(&queue->lock); +} + +// Set the target thread into suspend mode: in this mode, the thread will enter +// mp_dispatch_queue_process(), process any outstanding dispatch items, and +// wait for new items when done (instead of exiting the process function). +// Multiple threads can enter suspend mode at the same time. Suspend mode is +// not a synchronization mechanism; it merely makes sure the target thread does +// not leave mp_dispatch_queue_process(), even if it's done. mp_dispatch_lock() +// can be used for exclusive access. +void mp_dispatch_suspend(struct mp_dispatch_queue *queue) +{ + pthread_mutex_lock(&queue->lock); + queue->suspend_requested++; + while (!queue->suspended) { + pthread_mutex_unlock(&queue->lock); + if (queue->wakeup_fn) + queue->wakeup_fn(queue->wakeup_ctx); + pthread_mutex_lock(&queue->lock); + if (queue->suspended) + break; + pthread_cond_wait(&queue->cond, &queue->lock); + } + pthread_mutex_unlock(&queue->lock); +} + +// Undo mp_dispatch_suspend(). +void mp_dispatch_resume(struct mp_dispatch_queue *queue) +{ + pthread_mutex_lock(&queue->lock); + assert(queue->suspended); + assert(queue->suspend_requested > 0); + queue->suspend_requested--; + if (queue->suspend_requested == 0) + pthread_cond_broadcast(&queue->cond); + pthread_mutex_unlock(&queue->lock); +} + +// Grant exclusive access to the target thread's state. While this is active, +// no other thread can return from mp_dispatch_lock() (i.e. it behaves like +// a pthread mutex), and no other thread can get dispatch items completed. +// Other threads can still queue asynchronous dispatch items without waiting, +// and the mutex behavior applies to this function only. +void mp_dispatch_lock(struct mp_dispatch_queue *queue) +{ + mp_dispatch_suspend(queue); + pthread_mutex_lock(&queue->exclusive_lock); +} + +// Undo mp_dispatch_lock(). +void mp_dispatch_unlock(struct mp_dispatch_queue *queue) +{ + pthread_mutex_unlock(&queue->exclusive_lock); + mp_dispatch_resume(queue); +} diff --git a/misc/dispatch.h b/misc/dispatch.h new file mode 100644 index 0000000..96a5d7c --- /dev/null +++ b/misc/dispatch.h @@ -0,0 +1,23 @@ +#ifndef MP_DISPATCH_H_ +#define MP_DISPATCH_H_ + +typedef void (*mp_dispatch_fn)(void *data); +struct mp_dispatch_queue; + +struct mp_dispatch_queue *mp_dispatch_create(void *talloc_parent); +void mp_dispatch_set_wakeup_fn(struct mp_dispatch_queue *queue, + void (*wakeup_fn)(void *wakeup_ctx), + void *wakeup_ctx); +void mp_dispatch_enqueue(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data); +void mp_dispatch_enqueue_autofree(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data); +void mp_dispatch_run(struct mp_dispatch_queue *queue, + mp_dispatch_fn fn, void *fn_data); +void mp_dispatch_queue_process(struct mp_dispatch_queue *queue, double timeout); +void mp_dispatch_suspend(struct mp_dispatch_queue *queue); +void mp_dispatch_resume(struct mp_dispatch_queue *queue); +void mp_dispatch_lock(struct mp_dispatch_queue *queue); +void mp_dispatch_unlock(struct mp_dispatch_queue *queue); + +#endif diff --git a/misc/ring.c b/misc/ring.c index eb139c2..804e633 100644 --- a/misc/ring.c +++ b/misc/ring.c @@ -30,19 +30,17 @@ struct mp_ring { /* Positions of the first readable/writeable chunks. Do not read this * fields but use the atomic private accessors `mp_ring_get_wpos` * and `mp_ring_get_rpos`. */ - uint32_t rpos, wpos; + atomic_ulong rpos, wpos; }; -static uint32_t mp_ring_get_wpos(struct mp_ring *buffer) +static unsigned long mp_ring_get_wpos(struct mp_ring *buffer) { - mp_memory_barrier(); - return buffer->wpos; + return atomic_load(&buffer->wpos); } -static uint32_t mp_ring_get_rpos(struct mp_ring *buffer) +static unsigned long mp_ring_get_rpos(struct mp_ring *buffer) { - mp_memory_barrier(); - return buffer->rpos; + return atomic_load(&buffer->rpos); } struct mp_ring *mp_ring_new(void *talloc_ctx, int size) @@ -57,19 +55,8 @@ struct mp_ring *mp_ring_new(void *talloc_ctx, int size) return ringbuffer; } -int mp_ring_drain(struct mp_ring *buffer, int len) -{ - int buffered = mp_ring_buffered(buffer); - int drain_len = FFMIN(len, buffered); - mp_atomic_add_and_fetch(&buffer->rpos, drain_len); - mp_memory_barrier(); - return drain_len; -} - int mp_ring_read(struct mp_ring *buffer, unsigned char *dest, int len) { - if (!dest) return mp_ring_drain(buffer, len); - int size = mp_ring_size(buffer); int buffered = mp_ring_buffered(buffer); int read_len = FFMIN(len, buffered); @@ -78,15 +65,21 @@ int mp_ring_read(struct mp_ring *buffer, unsigned char *dest, int len) int len1 = FFMIN(size - read_ptr, read_len); int len2 = read_len - len1; - memcpy(dest, buffer->buffer + read_ptr, len1); - memcpy(dest + len1, buffer->buffer, len2); + if (dest) { + memcpy(dest, buffer->buffer + read_ptr, len1); + memcpy(dest + len1, buffer->buffer, len2); + } - mp_atomic_add_and_fetch(&buffer->rpos, read_len); - mp_memory_barrier(); + atomic_fetch_add(&buffer->rpos, read_len); return read_len; } +int mp_ring_drain(struct mp_ring *buffer, int len) +{ + return mp_ring_read(buffer, NULL, len); +} + int mp_ring_write(struct mp_ring *buffer, unsigned char *src, int len) { int size = mp_ring_size(buffer); @@ -100,16 +93,15 @@ int mp_ring_write(struct mp_ring *buffer, unsigned char *src, int len) memcpy(buffer->buffer + write_ptr, src, len1); memcpy(buffer->buffer, src + len1, len2); - mp_atomic_add_and_fetch(&buffer->wpos, write_len); - mp_memory_barrier(); + atomic_fetch_add(&buffer->wpos, write_len); return write_len; } void mp_ring_reset(struct mp_ring *buffer) { - buffer->wpos = buffer->rpos = 0; - mp_memory_barrier(); + atomic_store(&buffer->wpos, 0); + atomic_store(&buffer->rpos, 0); } int mp_ring_available(struct mp_ring *buffer) |