From: David S. <dsc...@us...> - 2002-11-01 21:38:53
|
CVS Root: /cvsroot/gstreamer Module: gstreamer Changes by: dschleef Date: Fri Nov 01 2002 13:38:52 PST Log message: Change from pthreads to GThreads Modified files: gst : cothreads.c gstinfo.c gstqueue.c gstthread.c gstthread.h Links: http://cvs.sf.net/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/gst/cothreads.c.diff?r1=1.82&r2=1.83 http://cvs.sf.net/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/gst/gstinfo.c.diff?r1=1.40&r2=1.41 http://cvs.sf.net/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/gst/gstqueue.c.diff?r1=1.54&r2=1.55 http://cvs.sf.net/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/gst/gstthread.c.diff?r1=1.95&r2=1.96 http://cvs.sf.net/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/gst/gstthread.h.diff?r1=1.17&r2=1.18 ====Begin Diffs==== Index: cothreads.c =================================================================== RCS file: /cvsroot/gstreamer/gstreamer/gst/cothreads.c,v retrieving revision 1.82 retrieving revision 1.83 diff -u -d -r1.82 -r1.83 --- cothreads.c 12 Sep 2002 18:56:30 -0000 1.82 +++ cothreads.c 1 Nov 2002 21:38:39 -0000 1.83 @@ -20,7 +20,8 @@ * Boston, MA 02111-1307, USA. */ -#include <pthread.h> +#include <glib.h> + #include <stdio.h> #include <stdlib.h> #include <signal.h> @@ -59,12 +60,12 @@ }; -/* this _cothread_ctx_key is used as a pthread key to the thread's context - * a pthread key is a "pointer" to memory space that is/can be different +/* this _cothread_ctx_key is used as a GThread key to the thread's context + * a GThread key is a "pointer" to memory space that is/can be different * (ie. private) for each thread. The key itself is shared among threads, * so it only needs to be initialized once. */ -static pthread_key_t _cothread_ctx_key = -1; +static GPrivate *_cothread_ctx_key; /* Disabling this define allows you to shut off a few checks in * cothread_switch. This likely will speed things up fractionally */ @@ -83,7 +84,14 @@ /* * initalize the whole of the cothreads context */ - cothread_context *ctx = (cothread_context *) g_malloc (sizeof (cothread_context)); + cothread_context *ctx; + + /* if there already is a cotread context for this thread, + * just return it */ + ctx = g_private_get (_cothread_ctx_key); + if(ctx) return ctx; + + ctx = (cothread_context *) g_malloc (sizeof (cothread_context)); /* we consider the initiating process to be cothread 0 */ ctx->ncothreads = 1; @@ -92,16 +100,18 @@ GST_INFO (GST_CAT_COTHREADS, "initializing cothreads"); - /* initialize the cothread key (for pthread space) if not done yet */ - if (_cothread_ctx_key == (pthread_key_t) -1) { - if (pthread_key_create (&_cothread_ctx_key, NULL) != 0) { - perror ("pthread_key_create"); + /* initialize the cothread key (for GThread space) if not done yet */ + /* FIXME this should be done in cothread_init() */ + if (_cothread_ctx_key == NULL) { + _cothread_ctx_key = g_private_new (NULL); + if (_cothread_ctx_key == NULL) { + perror ("g_private_new"); return NULL; } } /* set this thread's context pointer */ - pthread_setspecific (_cothread_ctx_key, ctx); + g_private_set (_cothread_ctx_key, ctx); /* clear the cothread data */ @@ -189,6 +199,7 @@ GST_DEBUG (GST_CAT_COTHREADS, "Found free cothread slot %d", slot); sp = CURRENT_STACK_FRAME; +printf("stack pointer %p\n",sp); /* FIXME this may not be 64bit clean * could use casts to uintptr_t from inttypes.h * if only all platforms had inttypes.h @@ -392,14 +403,14 @@ /** * cothread_current_main: * - * Get the main thread in the current pthread. + * Get the main thread in the current GThread. * - * Returns: the #cothread_state of the main (0th) thread in the current pthread + * Returns: the #cothread_state of the main (0th) thread in the current GThread */ cothread_state * cothread_current_main (void) { - cothread_context *ctx = pthread_getspecific (_cothread_ctx_key); + cothread_context *ctx = g_private_get (_cothread_ctx_key); return ctx->cothreads[0]; } @@ -414,7 +425,7 @@ cothread_state * cothread_current (void) { - cothread_context *ctx = pthread_getspecific (_cothread_ctx_key); + cothread_context *ctx = g_private_get (_cothread_ctx_key); return ctx->cothreads[ctx->current]; } @@ -422,7 +433,7 @@ static void cothread_stub (void) { - cothread_context *ctx = pthread_getspecific (_cothread_ctx_key); + cothread_context *ctx = g_private_get (_cothread_ctx_key); register cothread_state *thread = ctx->cothreads[ctx->current]; GST_DEBUG_ENTER (""); @@ -448,7 +459,7 @@ int cothread_getcurrent (void) { - cothread_context *ctx = pthread_getspecific (_cothread_ctx_key); + cothread_context *ctx = g_private_get (_cothread_ctx_key); if (!ctx) return -1; @@ -479,7 +490,7 @@ void cothread_context_set_data (cothread_state *thread, gchar *key, gpointer data) { - cothread_context *ctx = pthread_getspecific (_cothread_ctx_key); + cothread_context *ctx = g_private_get (_cothread_ctx_key); g_hash_table_insert (ctx->data, key, data); } @@ -510,7 +521,7 @@ gpointer cothread_context_get_data (cothread_state * thread, gchar * key) { - cothread_context *ctx = pthread_getspecific (_cothread_ctx_key); + cothread_context *ctx = g_private_get (_cothread_ctx_key); return g_hash_table_lookup (ctx->data, key); } Index: gstinfo.c =================================================================== RCS file: /cvsroot/gstreamer/gstreamer/gst/gstinfo.c,v retrieving revision 1.40 retrieving revision 1.41 diff -u -d -r1.40 -r1.41 --- gstinfo.c 11 Apr 2002 20:35:14 -0000 1.40 +++ gstinfo.c 1 Nov 2002 21:38:39 -0000 1.41 @@ -188,10 +188,10 @@ { gchar *empty = ""; gchar *elementname = empty,*location = empty; - int pthread_id = getpid(); + int pid = getpid(); int cothread_id = 0; /*FIXME*/ #ifdef GST_DEBUG_COLOR - int pthread_color = pthread_id%6 + 31; + int pid_color = pid%6 + 31; int cothread_color = (cothread_id < 0) ? 37 : (cothread_id%6 + 31); #endif @@ -208,11 +208,11 @@ #ifdef GST_DEBUG_COLOR fprintf(stderr,"DEBUG(\033[00;%dm%5d\033[00m:\033[00;%dm%2d\033[00m)\033[" "%s;%sm%s%s\033[00m %s\n", - pthread_color,pthread_id,cothread_color,cothread_id,incore?"00":"01", + pid_color,pid,cothread_color,cothread_id,incore?"00":"01", _gst_category_colors[category],location,elementname,string); #else fprintf(stderr,"DEBUG(%5d:%2d)%s%s %s\n", - pthread_id,cothread_id,location,elementname,string); + pid,cothread_id,location,elementname,string); #endif /* GST_DEBUG_COLOR */ if (location != empty) g_free(location); @@ -300,10 +300,10 @@ { gchar *empty = ""; gchar *elementname = empty,*location = empty; - int pthread_id = getpid(); + int pid = getpid(); int cothread_id = 0; /*FIXME*/ #ifdef GST_DEBUG_COLOR - int pthread_color = pthread_id%6 + 31; + int pid_color = pid%6 + 31; int cothread_color = (cothread_id < 0) ? 37 : (cothread_id%6 + 31); #endif @@ -319,11 +319,11 @@ #ifdef GST_DEBUG_COLOR fprintf(stderr,"\033[01mINFO\033[00m (\033[00;%dm%5d\033[00m:\033[00;%dm%2d\033[00m)\033[" GST_DEBUG_CHAR_MODE ";%sm%s%s\033[00m %s\n", - pthread_color,pthread_id,cothread_color,cothread_id, + pid_color,pid,cothread_color,cothread_id, _gst_category_colors[category],location,elementname,string); #else fprintf(stderr,"INFO (%5d:%2d)%s%s %s\n", - pthread_id,cothread_id,location,elementname,string); + pid,cothread_id,location,elementname,string); #endif /* GST_DEBUG_COLOR */ /* #else Index: gstqueue.c =================================================================== RCS file: /cvsroot/gstreamer/gstreamer/gst/gstqueue.c,v retrieving revision 1.54 retrieving revision 1.55 diff -u -d -r1.54 -r1.55 --- gstqueue.c 25 Oct 2002 19:14:57 -0000 1.54 +++ gstqueue.c 1 Nov 2002 21:38:39 -0000 1.55 @@ -29,7 +29,6 @@ #define STATUS(A) #endif -#include <pthread.h> #include "config.h" #include "gst_private.h" @@ -320,9 +319,9 @@ restart: /* we have to lock the queue since we span threads */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ()); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld", pthread_self ()); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p", g_thread_self ()); /* assume don't need to flush this buffer when the queue is filled */ queue->flush = FALSE; @@ -472,9 +471,9 @@ restart: /* have to lock for thread-safety */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld", pthread_self ()); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%p", g_thread_self ()); g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p", pthread_self (), queue->not_empty); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%p %p", g_thread_self (), queue->not_empty); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d", queue->level_buffers, queue->size_buffers); while (queue->level_buffers == 0) { Index: gstthread.c =================================================================== RCS file: /cvsroot/gstreamer/gstreamer/gst/gstthread.c,v retrieving revision 1.95 retrieving revision 1.96 diff -u -d -r1.95 -r1.96 --- gstthread.c 29 Sep 2002 18:12:50 -0000 1.95 +++ gstthread.c 1 Nov 2002 21:38:39 -0000 1.96 @@ -30,6 +30,10 @@ #include "gstscheduler.h" #include "gstqueue.h" +#define STACK_SIZE 0x200000 + +#define g_thread_equal(a,b) ((a) == (b)) + GstElementDetails gst_thread_details = { "Threaded container", "Generic/Bin", @@ -84,9 +88,10 @@ gst_thread_schedpolicy_get_type(void) { static GType thread_schedpolicy_type = 0; static GEnumValue thread_schedpolicy[] = { - {SCHED_OTHER, "SCHED_OTHER", "Normal Scheduling"}, - {SCHED_FIFO, "SCHED_FIFO", "FIFO Scheduling (requires root)"}, - {SCHED_RR, "SCHED_RR", "Round-Robin Scheduling (requires root)"}, + {G_THREAD_PRIORITY_LOW, "LOW", "Low Priority Scheduling"}, + {G_THREAD_PRIORITY_NORMAL, "NORMAL", "Normal Scheduling"}, + {G_THREAD_PRIORITY_HIGH, "HIGH", "High Priority Scheduling"}, + {G_THREAD_PRIORITY_URGENT, "URGENT", "Urgent Scheduling"}, {0, NULL, NULL}, }; if (!thread_schedpolicy_type) { @@ -137,7 +142,7 @@ g_object_class_install_property(G_OBJECT_CLASS (klass), ARG_SCHEDPOLICY, g_param_spec_enum("schedpolicy", "Scheduling Policy", "The scheduling policy of the thread", - GST_TYPE_THREAD_SCHEDPOLICY, SCHED_OTHER, G_PARAM_READWRITE)); + GST_TYPE_THREAD_SCHEDPOLICY, G_THREAD_PRIORITY_NORMAL, G_PARAM_READWRITE)); g_object_class_install_property(G_OBJECT_CLASS (klass), ARG_PRIORITY, g_param_spec_int("priority", "Scheduling Priority", "The scheduling priority of the thread", 0, 99, 0, G_PARAM_READWRITE)); @@ -179,8 +184,8 @@ thread->cond = g_cond_new (); thread->ppid = getpid (); - thread->thread_id = (pthread_t) -1; - thread->sched_policy = SCHED_OTHER; + thread->thread_id = (GThread *) NULL; + thread->sched_policy = G_THREAD_PRIORITY_NORMAL; thread->priority = 0; thread->stack = NULL; } @@ -309,8 +314,8 @@ GstThread *thread; gboolean stateset = GST_STATE_SUCCESS; gint transition; - pthread_t self = pthread_self (); - glong stacksize; + GThread * self = g_thread_self (); + GError * error = NULL; g_return_val_if_fail (GST_IS_THREAD (element), GST_STATE_FAILURE); g_return_val_if_fail (gst_has_threads (), GST_STATE_FAILURE); @@ -323,7 +328,7 @@ gst_element_state_get_name (GST_STATE (element)), gst_element_state_get_name (GST_STATE_PENDING (element))); - if (pthread_equal (self, thread->thread_id)) { + if (g_thread_equal (self, thread->thread_id)) { GST_DEBUG (GST_CAT_THREAD, "no sync(" GST_DEBUG_THREAD_FORMAT "): setting own thread's state to spinning", GST_DEBUG_THREAD_ARGS (thread->pid)); @@ -337,70 +342,29 @@ THR_DEBUG ("creating thread \"%s\"", GST_ELEMENT_NAME (element)); - /* this bit of code handles creation of pthreads + /* this bit of code handles creation of GThreads * this is therefor tricky code * compare it with the block of code that handles the destruction * in GST_STATE_READY_TO_NULL below */ g_mutex_lock (thread->lock); - /* create attribute struct for pthread - * and assign stack pointer and size to it - * - * the default state of a pthread is PTHREAD_CREATE_JOINABLE - * (see man pthread_attr_init) - * - other thread can sync on termination - * - thread resources are kept allocated until other thread performs - * pthread_join - */ - - if (pthread_attr_init (&thread->attr) != 0) - g_warning ("pthread_attr_init returned an error !"); - - /* this function should return a newly allocated stack - * (using whatever method) - * which we can initiate the pthreads with - * the stack should be freed in - */ - if (gst_scheduler_get_preferred_stack (GST_ELEMENT_SCHED (element), - &thread->stack, &stacksize)) { -#ifdef HAVE_PTHREAD_ATTR_SETSTACK - if (pthread_attr_setstack (&thread->attr, - thread->stack, stacksize) != 0) { - g_warning ("pthread_attr_setstack failed\n"); - return GST_STATE_FAILURE; - } -#else - if (pthread_attr_setstackaddr (&thread->attr, thread->stack) != 0) { - g_warning ("pthread_attr_setstackaddr failed\n"); - return GST_STATE_FAILURE; - } - if (pthread_attr_setstacksize (&thread->attr, stacksize) != 0) { - g_warning ("pthread_attr_setstacksize failed\n"); - return GST_STATE_FAILURE; - } -#endif - GST_DEBUG (GST_CAT_THREAD, "pthread attr set stack at %p of size %ld", - thread->stack, stacksize); - } - else { - g_warning ("scheduler did not return a preferred stack"); - } - - /* create a new pthread + /* create a new GThread * use the specified attributes * make it execute gst_thread_main_loop (thread) */ - GST_DEBUG (GST_CAT_THREAD, "going to pthread_create..."); - if (pthread_create (&thread->thread_id, &thread->attr, - gst_thread_main_loop, thread) != 0) { - GST_DEBUG (GST_CAT_THREAD, "pthread_create failed"); + GST_DEBUG (GST_CAT_THREAD, "going to g_thread_create_full..."); + thread->thread_id = g_thread_create_full(gst_thread_main_loop, + thread, STACK_SIZE, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL, + &error); + if (!thread->thread_id){ + GST_DEBUG (GST_CAT_THREAD, "g_thread_create_full failed"); g_mutex_unlock (thread->lock); GST_DEBUG (GST_CAT_THREAD, "could not create thread \"%s\"", GST_ELEMENT_NAME (element)); return GST_STATE_FAILURE; } - GST_DEBUG (GST_CAT_THREAD, "pthread created"); + GST_DEBUG (GST_CAT_THREAD, "GThread created"); /* wait for it to 'spin up' */ THR_DEBUG ("waiting for child thread spinup"); @@ -530,17 +494,10 @@ * compare this block to the block */ - - /* in glibc 2.2.5, pthread_attr_destroy does nothing more - * than return 0 */ - if (pthread_attr_destroy (&thread->attr) != 0) - g_warning ("pthread_attr_destroy has failed !"); - - GST_DEBUG (GST_CAT_THREAD, "joining pthread %ld", thread->thread_id); - if (pthread_join (thread->thread_id, NULL) != 0) - g_warning ("pthread_join has failed !\n"); + GST_DEBUG (GST_CAT_THREAD, "joining GThread %p", thread->thread_id); + g_thread_join (thread->thread_id); - thread->thread_id = -1; + thread->thread_id = NULL; /* the stack was allocated when we created the thread * using scheduler->get_preferred_stack */ @@ -590,32 +547,35 @@ { GstThread *thread = NULL; gint stateset; + glong page_size; + gpointer stack_pointer; + gulong stack_offset; GST_DEBUG (GST_CAT_THREAD, "gst_thread_main_loop started"); thread = GST_THREAD (arg); g_mutex_lock (thread->lock); - /* handle scheduler policy; do stuff if not the normal scheduler */ - if (thread->sched_policy != SCHED_OTHER) { - struct sched_param sched_param; - - memset (&sched_param, 0, sizeof (sched_param)); - if (thread->priority == 0) { - thread->priority = sched_get_priority_max (thread->sched_policy); - } - sched_param.sched_priority = thread->priority; - - if (sched_setscheduler (0, thread->sched_policy, &sched_param) != 0) { - GST_DEBUG (GST_CAT_THREAD, "not running with real-time priority"); - } - } - /* set up the element's scheduler */ gst_scheduler_setup (GST_ELEMENT_SCHED (thread)); GST_FLAG_UNSET (thread, GST_THREAD_STATE_REAPING); thread->pid = getpid(); THR_INFO_MAIN ("thread is running"); + + page_size = sysconf(_SC_PAGESIZE); + stack_pointer = (gpointer) &stack_pointer; + + if(((gulong)stack_pointer & (page_size-1)) < (page_size>>1)){ + /* stack grows up, I think */ + /* FIXME this is probably not true for the main thread */ + stack_offset = (gulong)stack_pointer & (page_size - 1); + }else{ + /* stack grows down, I think */ + stack_offset = STACK_SIZE - ((gulong)stack_pointer & (page_size - 1)); + } + /* note the subtlety with pointer arithmetic */ + thread->stack = stack_pointer - stack_offset; + thread->stack_size = STACK_SIZE; /* first we need to change the state of all the children */ if (GST_ELEMENT_CLASS (parent_class)->change_state) { Index: gstthread.h =================================================================== RCS file: /cvsroot/gstreamer/gstreamer/gst/gstthread.h,v retrieving revision 1.17 retrieving revision 1.18 diff -u -d -r1.17 -r1.18 --- gstthread.h 9 Jul 2002 10:27:21 -0000 1.17 +++ gstthread.h 1 Nov 2002 21:38:39 -0000 1.18 @@ -24,8 +24,7 @@ #ifndef __GST_THREAD_H__ #define __GST_THREAD_H__ -#include <unistd.h> -#include <pthread.h> +#include <glib.h> #include <gst/gstbin.h> @@ -62,11 +61,11 @@ struct _GstThread { GstBin bin; - pthread_t thread_id; /* id of the thread, if any */ - pthread_attr_t attr; /* attributes for the stack space */ + GThread *thread_id; /* id of the thread, if any */ int sched_policy; int priority; - void *stack; /* set with gst_scheduler_get_preferred_stack */ + gpointer *stack; /* set with gst_scheduler_get_preferred_stack */ + guint stack_size; /* stack size */ gint pid; /* the pid of the thread */ gint ppid; /* the pid of the thread's parent process */ GMutex *lock; /* thread lock/condititon pair ... */ |