|
From: Torsten J. <t....@gm...> - 2018-01-19 17:42:39
|
# HG changeset patch
# User Torsten Jager <t....@gm...>
# Date 1516383725 -3600
# Node ID 87dfc40e92325df77884e15d0ad30cdc25d69286
# Branch default
# Parent a10bdb3e18d1c00ebb38b856f8aba991606f81a8
Optimize tickets 2.
- Move to separate header.
- Add revoke and reissue when already holding that ticket.
- Add revoke callbacks.
- Add rewire revoke mode, saves some extra calls.
- Map timeout zero to trylock.
diff --git a/include/Makefile.am b/include/Makefile.am
--- a/include/Makefile.am
+++ b/include/Makefile.am
@@ -32,6 +32,7 @@
xine/sorted_array.h \
xine/spu.h \
xine/spu_decoder.h \
+ xine/tickets.h \
xine/vdr.h \
xine/version.h \
xine/video_decoder.h \
--git a/include/xine/tickets.h b/include/xine/tickets.h
new file mode 100644
--- /dev/null
+++ b/include/xine/tickets.h
@@ -0,0 +1,158 @@
+/*
+ * Copyright (C) 2000-2018 the xine project
+ *
+ * This file is part of xine, a free video player.
+ *
+ * xine is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * xine is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110, USA
+ */
+
+#ifndef HAVE_XINE_TICKETS_H
+#define HAVE_XINE_TICKETS_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <xine.h>
+
+/*
+ * xine thread tickets.
+ * This is similar to a rw lock in "prefer writers" mode.
+ * - Shared recursive read lock: acquire () / release ()
+ * - Recursive write lock: revoke () / issue ()
+ * plus there are a few special hacks.
+ * - Low overhead temporary read unlock: renew ()
+ * (better than release () + acquire ()).
+ * Mutex less check ticket_revoked whether it is time to do so.
+ * Yes this may delay writers a bit due to multicore data cache
+ * sync issues (and your check interval, of course) but since
+ * writing is rare we accept that.
+ * Alternatively, register a revoke notify callback that stops
+ * waiting for something else.
+ * - Read lock with priority over writers: "irrevocable"
+ * (use for known quick actions only).
+ * - Top level uninterruptible write lock: "atomic".
+ * Note that the write side is for engine internal use.
+ * It now also supports turning an already held read into write
+ * temporarily.
+ */
+
+typedef void xine_ticket_revoke_cb_t (void *user_data, int flags);
+typedef struct xine_ticket_s xine_ticket_t;
+
+struct xine_ticket_s {
+
+ /* the ticket owner must assure to check for ticket revocation in
+ * intervals of finite length; this means that you must release
+ * the ticket before any operation that might block
+ *
+ * you must never write to this member directly
+ */
+ int ticket_revoked;
+
+ /* apply for a ticket; between acquire and relese of an irrevocable
+ * ticket (be sure to pair them properly!), it is guaranteed that you
+ * will never be blocked by ticket revocation */
+ void (*acquire)(xine_ticket_t *self, int irrevocable);
+
+ /* give a ticket back */
+ void (*release)(xine_ticket_t *self, int irrevocable);
+
+ /* renew a ticket, when it has been revoked, see ticket_revoked above;
+ * irrevocable must be set to one, if your thread might have acquired
+ * irrevocable tickets you don't know of; set it to zero only when
+ * you know that this is impossible. */
+ void (*renew)(xine_ticket_t *self, int irrevocable);
+
+#ifdef XINE_ENGINE_INTERNAL
+ /* XXX: port rewiring in the middle of a decoder loop iteration is a bad idea.
+ * We could make that halfway safe by (un)referencing post ports consistently.
+ * Unfortunately, post plugins intercept our calls at will.
+ * Even worse, we ourselves told them to inline _x_post_rewire () then
+ * still use the previous port :-/
+ * KLUDGE:
+ * - We now have 2 levels of revocation, plain and rewire.
+ * - If there is a pending rewire, ticket_revoked has the rewire flag set.
+ * - A pending or incoming rewire revoke repels plain renewers, so they
+ * can advance to a code location more suitable for a rewire, then
+ * renew again with rewire flag set.
+ * - Output layer get funcs return an emergency buffer/frame if engine is paused.
+ * Why the f**k didnt we allow NULL there??
+ * - Decoder loops let the full thing go right before next iteration. */
+#define XINE_TICKET_FLAG_ATOMIC 1
+#define XINE_TICKET_FLAG_REWIRE 2
+
+ /* allow handing out new tickets. atomic needs to be same as below. */
+ void (*issue)(xine_ticket_t *self, int flags);
+
+ /* revoke all tickets and deny new ones;
+ * a pair of atomic revoke and issue cannot be interrupted by another
+ * revocation or by other threads acquiring tickets.
+ * set rewire flag to also do that lock. */
+ void (*revoke)(xine_ticket_t *self, int flags);
+
+ /* behaves like acquire() but doesn't block the calling thread; when
+ * the thread would have been blocked, 0 is returned otherwise 1
+ * this function acquires a ticket even if ticket revocation is active */
+ int (*acquire_nonblocking)(xine_ticket_t *self, int irrevocable);
+
+ /* behaves like release() but doesn't block the calling thread; should
+ * be used in combination with acquire_nonblocking() */
+ void (*release_nonblocking)(xine_ticket_t *self, int irrevocable);
+
+ /* forbid port rewiring for critical code sections. */
+ int (*lock_port_rewiring)(xine_ticket_t *self, int ms_timeout);
+ void (*unlock_port_rewiring)(xine_ticket_t *self);
+
+ void (*dispose)(xine_ticket_t *self);
+
+ pthread_mutex_t lock;
+ pthread_mutex_t revoke_lock;
+ pthread_cond_t issued;
+ pthread_cond_t revoked;
+ int tickets_granted;
+ int irrevocable_tickets;
+ int pending_revocations;
+ int atomic_revoke;
+ pthread_t atomic_revoker_thread;
+ pthread_mutex_t port_rewiring_lock;
+ struct {
+ int count;
+ pthread_t holder;
+ } *holder_threads;
+ unsigned holder_thread_count;
+
+ int plain_renewers;
+ int rewirers;
+
+#define XINE_TICKET_MAX_CB 15
+ /* (un)register a revoke notify callback, telling the current revoke flags.
+ * Return from it does not * imply anything about the ticket itself,
+ * it just shall shorten wait.
+ * Note that unregister needs the same data pointer as well to
+ * handle multiple instances of the same object. */
+ void (*revoke_cb_register) (xine_ticket_t *self, xine_ticket_revoke_cb_t *cb, void *user_data);
+ void (*revoke_cb_unregister)(xine_ticket_t *self, xine_ticket_revoke_cb_t *cb, void *user_data);
+
+ xine_ticket_revoke_cb_t *revoke_callbacks[XINE_TICKET_MAX_CB + 1];
+ void *revoke_cb_data[XINE_TICKET_MAX_CB + 1];
+#endif
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
--git a/include/xine/xine_internal.h b/include/xine/xine_internal.h
--- a/include/xine/xine_internal.h
+++ b/include/xine/xine_internal.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2000-2017 the xine project
+ * Copyright (C) 2000-2018 the xine project
*
* This file is part of xine, a free video player.
*
@@ -30,6 +30,7 @@
*/
#include <xine.h>
+#include <xine/tickets.h>
#include <xine/refcounter.h>
#include <xine/input_plugin.h>
#include <xine/demux.h>
@@ -67,8 +68,6 @@
#define XINE_STREAM_INFO_MAX 99
-typedef struct xine_ticket_s xine_ticket_t;
-
/*
* the "big" xine struct, holding everything together
*/
@@ -119,75 +118,6 @@
};
/*
- * xine thread tickets
- */
-
-struct xine_ticket_s {
-
- /* the ticket owner must assure to check for ticket revocation in
- * intervals of finite length; this means that you must release
- * the ticket before any operation that might block
- *
- * you must never write to this member directly
- */
- int ticket_revoked;
-
- /* apply for a ticket; between acquire and relese of an irrevocable
- * ticket (be sure to pair them properly!), it is guaranteed that you
- * will never be blocked by ticket revocation */
- void (*acquire)(xine_ticket_t *self, int irrevocable);
-
- /* give a ticket back */
- void (*release)(xine_ticket_t *self, int irrevocable);
-
- /* renew a ticket, when it has been revoked, see ticket_revoked above;
- * irrevocable must be set to one, if your thread might have acquired
- * irrevocable tickets you don't know of; set it to zero only when
- * you know that this is impossible */
- void (*renew)(xine_ticket_t *self, int irrevocable);
-
-#ifdef XINE_ENGINE_INTERNAL
- /* allow handing out new tickets */
- void (*issue)(xine_ticket_t *self, int atomic);
-
- /* revoke all tickets and deny new ones;
- * a pair of atomic revoke and issue cannot be interrupted by another
- * revocation or by other threads acquiring tickets */
- void (*revoke)(xine_ticket_t *self, int atomic);
-
- /* behaves like acquire() but doesn't block the calling thread; when
- * the thread would have been blocked, 0 is returned otherwise 1
- * this function acquires a ticket even if ticket revocation is active */
- int (*acquire_nonblocking)(xine_ticket_t *self, int irrevocable);
-
- /* behaves like release() but doesn't block the calling thread; should
- * be used in combination with acquire_nonblocking() */
- void (*release_nonblocking)(xine_ticket_t *self, int irrevocable);
-
- int (*lock_port_rewiring)(xine_ticket_t *self, int ms_timeout);
- void (*unlock_port_rewiring)(xine_ticket_t *self);
-
- void (*dispose)(xine_ticket_t *self);
-
- pthread_mutex_t lock;
- pthread_mutex_t revoke_lock;
- pthread_cond_t issued;
- pthread_cond_t revoked;
- int tickets_granted;
- int irrevocable_tickets;
- int pending_revocations;
- int atomic_revoke;
- pthread_t atomic_revoker_thread;
- pthread_mutex_t port_rewiring_lock;
- struct {
- int count;
- pthread_t holder;
- } *holder_threads;
- unsigned holder_thread_count;
-#endif
-};
-
-/*
* xine event queue
*/
@@ -218,14 +148,14 @@
/* demuxers use input_plugin to read data */
input_plugin_t *input_plugin;
- /* used by video decoders */
- xine_video_port_t *video_out;
+ /* used by video decoders, may change by port rewire */
+ xine_video_port_t * volatile video_out;
/* demuxers send data to video decoders using this fifo */
fifo_buffer_t *video_fifo;
- /* used by audio decoders */
- xine_audio_port_t *audio_out;
+ /* used by audio decoders, may change by port rewire */
+ xine_audio_port_t * volatile audio_out;
/* demuxers send data to audio decoders using this fifo */
fifo_buffer_t *audio_fifo;
--git a/src/xine-engine/xine.c b/src/xine-engine/xine.c
--- a/src/xine-engine/xine.c
+++ b/src/xine-engine/xine.c
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2000-2017 the xine project
+ * Copyright (C) 2000-2018 the xine project
*
* This file is part of xine, a free video player.
*
@@ -136,6 +136,41 @@
}
}
+static void ticket_revoke_cb_register (xine_ticket_t *this, xine_ticket_revoke_cb_t *cb, void *user_data) {
+ int i;
+ pthread_mutex_lock (&this->lock);
+ for (i = 0; i < XINE_TICKET_MAX_CB; i++) {
+ if (!this->revoke_callbacks[i]) {
+ this->revoke_callbacks[i] = cb;
+ this->revoke_cb_data[i] = user_data;
+ break;
+ }
+ if ((this->revoke_callbacks[i] == cb) && (this->revoke_cb_data[i] == user_data))
+ break;
+ }
+ pthread_mutex_unlock (&this->lock);
+}
+
+static void ticket_revoke_cb_unregister (xine_ticket_t *this, xine_ticket_revoke_cb_t *cb, void *user_data) {
+ int f, l;
+ pthread_mutex_lock (&this->lock);
+ for (f = 0; f < XINE_TICKET_MAX_CB; f++) {
+ if (!this->revoke_callbacks[f])
+ break;
+ if ((this->revoke_callbacks[f] == cb) && (this->revoke_cb_data[f] == user_data))
+ break;
+ }
+ for (l = f; this->revoke_callbacks[l]; l++) ;
+ l--;
+ if (f < l) {
+ this->revoke_callbacks[f] = this->revoke_callbacks[l];
+ this->revoke_cb_data[f] = this->revoke_cb_data[l];
+ }
+ this->revoke_callbacks[l] = NULL;
+ this->revoke_cb_data[l] = NULL;
+ pthread_mutex_unlock (&this->lock);
+}
+
static int ticket_acquire_internal(xine_ticket_t *this, int irrevocable, int nonblocking) {
pthread_t self = pthread_self ();
int wait, i;
@@ -249,64 +284,147 @@
}
static void ticket_renew(xine_ticket_t *this, int irrevocable) {
-
- pthread_mutex_lock(&this->lock);
-
- this->tickets_granted--;
-
- _x_assert(this->ticket_revoked);
- if (!this->tickets_granted)
- pthread_cond_broadcast(&this->revoked);
- if (!this->irrevocable_tickets || !irrevocable)
- pthread_cond_wait(&this->issued, &this->lock);
-
- this->tickets_granted++;
-
- pthread_mutex_unlock(&this->lock);
+ pthread_mutex_lock (&this->lock);
+ _x_assert (this->ticket_revoked);
+
+ if (irrevocable & XINE_TICKET_FLAG_REWIRE) {
+ /* allow everything */
+ this->tickets_granted--;
+ if (!this->tickets_granted)
+ pthread_cond_broadcast (&this->revoked);
+ while ((this->pending_revocations > 0)
+ && (!this->irrevocable_tickets || !(irrevocable & ~XINE_TICKET_FLAG_REWIRE)))
+ pthread_cond_wait (&this->issued, &this->lock);
+ this->tickets_granted++;
+ } else {
+ /* fair wheather (not rewire) mode */
+ this->plain_renewers++;
+ this->tickets_granted--;
+ if (!this->tickets_granted)
+ pthread_cond_broadcast (&this->revoked);
+ while ((this->pending_revocations > 0)
+ && (!this->irrevocable_tickets || !irrevocable)
+ && !this->rewirers)
+ pthread_cond_wait (&this->issued, &this->lock);
+ this->tickets_granted++;
+ this->plain_renewers--;
+ }
+
+ pthread_mutex_unlock (&this->lock);
}
-static void ticket_issue(xine_ticket_t *this, int atomic) {
-
- if (!atomic)
- pthread_mutex_lock(&this->revoke_lock);
- pthread_mutex_lock(&this->lock);
-
- this->pending_revocations--;
- if (this->pending_revocations < 0)
- this->pending_revocations = 0;
+static void ticket_issue (xine_ticket_t *this, int flags) {
+ pthread_t self = pthread_self ();
+ int i;
+
+ if (!(flags & (XINE_TICKET_FLAG_REWIRE | XINE_TICKET_FLAG_ATOMIC)))
+ pthread_mutex_lock (&this->revoke_lock);
+ pthread_mutex_lock (&this->lock);
+
+ if (flags & XINE_TICKET_FLAG_REWIRE) {
+ if (this->rewirers > 0)
+ this->rewirers--;
+ }
+ if (this->pending_revocations > 0)
+ this->pending_revocations--;
+
if (!this->pending_revocations)
- pthread_cond_broadcast(&this->issued);
+ pthread_cond_broadcast (&this->issued);
+
+ /* HACK: restore self grants. */
[... 10 lines omitted ...]
+ while (this->pending_revocations)
+ pthread_cond_wait (&this->issued, &this->lock);
+ this->tickets_granted += n;
+ pthread_mutex_unlock (&this->lock);
+ return;
+ }
+ this->tickets_granted += n;
+ }
+ }
+ break;
+ }
+ }
this->atomic_revoke = 0;
pthread_mutex_unlock(&this->lock);
pthread_mutex_unlock(&this->revoke_lock);
+ if (flags & XINE_TICKET_FLAG_REWIRE)
+ pthread_mutex_unlock (&this->port_rewiring_lock);
}
-static void ticket_revoke(xine_ticket_t *this, int atomic) {
-
+static void ticket_revoke(xine_ticket_t *this, int flags) {
+ pthread_t self = pthread_self ();
+
+ if (flags & XINE_TICKET_FLAG_REWIRE)
+ pthread_mutex_lock (&this->port_rewiring_lock);
pthread_mutex_lock(&this->revoke_lock);
pthread_mutex_lock(&this->lock);
+ /* HACK: dont freeze on self grants, save them.
+ * Also, nest revokes at bit 19. */
+ {
+ int i;
+ for (i = 0; i < this->holder_thread_count; i++) {
+ if (pthread_equal (this->holder_threads[i].holder, self)) {
+ int n = this->holder_threads[i].count;
+ if (n < 0x80000)
+ this->tickets_granted -= n;
+ this->holder_threads[i].count = n + 0x80000;
+ break;
+ }
+ }
+ }
+
+ /* set these early so release () / renew () see them. */
this->pending_revocations++;
-
- while (this->tickets_granted) {
- this->ticket_revoked = 1;
- pthread_cond_wait (&this->revoked, &this->lock);
+ this->atomic_revoke = flags;
+ /* no joke - see renew () */
+ if (flags & XINE_TICKET_FLAG_REWIRE) {
+ this->rewirers++;
+ if (this->plain_renewers)
+ pthread_cond_broadcast (&this->issued);
}
+
+ if (this->tickets_granted || (this->rewirers && this->plain_renewers)) {
+ int i, m = (this->rewirers ? (XINE_TICKET_FLAG_REWIRE | 1) : 1);
+ /* Notify other holders. */
+ this->ticket_revoked = m;
+ for (i = 0; i < XINE_TICKET_MAX_CB; i++) {
+ if (!this->revoke_callbacks[i])
+ break;
+ this->revoke_callbacks[i] (this->revoke_cb_data[i], this->atomic_revoke);
+ }
+ /* Wait for them to release/renew. */
+ do {
+ this->ticket_revoked = m;
+ pthread_cond_wait (&this->revoked, &this->lock);
+ } while (this->tickets_granted || (this->rewirers && this->plain_renewers));
+ }
+
this->ticket_revoked = 0;
- if (atomic) {
- this->atomic_revoke = 1;
+ if (flags & (XINE_TICKET_FLAG_REWIRE | XINE_TICKET_FLAG_ATOMIC))
this->atomic_revoker_thread = pthread_self();
- }
pthread_mutex_unlock(&this->lock);
- if (!atomic)
+ if (!(flags & (XINE_TICKET_FLAG_REWIRE | XINE_TICKET_FLAG_ATOMIC)))
pthread_mutex_unlock(&this->revoke_lock);
}
static int lock_timeout (pthread_mutex_t *mutex, int ms_timeout) {
+ if (ms_timeout == 0)
+ return (0 == pthread_mutex_trylock (mutex));
if (ms_timeout >= 0) {
struct timespec abstime;
struct timeval now;
@@ -353,17 +471,30 @@
if (!port_ticket)
return NULL;
+ port_ticket->ticket_revoked = 0;
+ port_ticket->holder_thread_count = 0;
+ port_ticket->plain_renewers = 0;
+ port_ticket->rewirers = 0;
+ {
+ int i;
+ for (i = 0; i < XINE_TICKET_MAX_CB; i++) {
+ port_ticket->revoke_callbacks[i] = NULL;
+ port_ticket->revoke_cb_data[i] = NULL;
+ }
+ }
+
port_ticket->acquire_nonblocking = ticket_acquire_nonblocking;
port_ticket->acquire = ticket_acquire;
port_ticket->release_nonblocking = ticket_release_nonblocking;
port_ticket->release = ticket_release;
port_ticket->renew = ticket_renew;
+ port_ticket->revoke_cb_register = ticket_revoke_cb_register;
+ port_ticket->revoke_cb_unregister = ticket_revoke_cb_unregister;
port_ticket->issue = ticket_issue;
port_ticket->revoke = ticket_revoke;
port_ticket->lock_port_rewiring = ticket_lock_port_rewiring;
port_ticket->unlock_port_rewiring = ticket_unlock_port_rewiring;
port_ticket->dispose = ticket_dispose;
- port_ticket->holder_thread_count = 0;
port_ticket->holder_threads = malloc (32 * sizeof (*port_ticket->holder_threads));
if (!port_ticket->holder_threads) {
free (port_ticket);
|