From: <wt...@fr...> - 2005-06-25 17:55:12
|
CVS Root: /cvs/gstreamer Module: gstreamer Changes by: wtay Date: Sat Jun 25 2005 10:55:10 PDT Log message: * gst/base/gstbasesink.c: (gst_basesink_set_property), (gst_basesink_preroll_queue_empty), (gst_basesink_preroll_queue_flush), (gst_basesink_handle_object), (gst_basesink_event), (gst_basesink_do_sync), (gst_basesink_handle_event), (gst_basesink_handle_buffer), (gst_basesink_chain), (gst_basesink_loop), (gst_basesink_activate), (gst_basesink_change_state): Reworked the base sink, handle event and buffer serialisation correctly and removed possible deadlock. Handle EOS correctly. Modified files: . : ChangeLog gst/base : gstbasesink.c Links: http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/ChangeLog.diff?r1=1.1164&r2=1.1165 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/gst/base/gstbasesink.c.diff?r1=1.20&r2=1.21 ====Begin Diffs==== Index: ChangeLog =================================================================== RCS file: /cvs/gstreamer/gstreamer/ChangeLog,v retrieving revision 1.1164 retrieving revision 1.1165 diff -u -d -r1.1164 -r1.1165 --- ChangeLog 25 Jun 2005 17:51:12 -0000 1.1164 +++ ChangeLog 25 Jun 2005 17:54:58 -0000 1.1165 @@ -1,5 +1,18 @@ 2005-06-25 Wim Taymans <wi...@fl...> + * gst/base/gstbasesink.c: (gst_basesink_set_property), + (gst_basesink_preroll_queue_empty), + (gst_basesink_preroll_queue_flush), (gst_basesink_handle_object), + (gst_basesink_event), (gst_basesink_do_sync), + (gst_basesink_handle_event), (gst_basesink_handle_buffer), + (gst_basesink_chain), (gst_basesink_loop), (gst_basesink_activate), + (gst_basesink_change_state): + Reworked the base sink, handle event and buffer serialisation + correctly and removed possible deadlock. + Handle EOS correctly. + +2005-06-25 Wim Taymans <wi...@fl...> * gst/gstpipeline.c: (is_eos), (pipeline_bus_handler), (gst_pipeline_change_state): * tools/gst-launch.c: (check_intr), (event_loop), (main): Index: gstbasesink.c RCS file: /cvs/gstreamer/gstreamer/gst/base/gstbasesink.c,v retrieving revision 1.20 retrieving revision 1.21 diff -u -d -r1.20 -r1.21 --- gstbasesink.c 17 Jun 2005 11:58:48 -0000 1.20 +++ gstbasesink.c 25 Jun 2005 17:54:58 -0000 1.21 @@ -29,13 +29,6 @@ GST_DEBUG_CATEGORY_STATIC (gst_basesink_debug); #define GST_CAT_DEFAULT gst_basesink_debug -/* #define DEBUGGING */ -#ifdef DEBUGGING -#define DEBUG(str,args...) g_print (str,##args) -#else -#define DEBUG(str,args...) -#endif - /* BaseSink signals and properties */ enum { @@ -104,14 +97,15 @@ static GstElementStateReturn gst_basesink_change_state (GstElement * element); -static GstFlowReturn gst_basesink_chain_unlocked (GstPad * pad, - GstBuffer * buffer); +static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer); static void gst_basesink_loop (GstPad * pad); static GstFlowReturn gst_basesink_chain (GstPad * pad, GstBuffer * buffer); static gboolean gst_basesink_activate (GstPad * pad, GstActivateMode mode); static gboolean gst_basesink_event (GstPad * pad, GstEvent * event); static inline GstFlowReturn gst_basesink_handle_buffer (GstBaseSink * basesink, GstBuffer * buf); +static inline gboolean gst_basesink_handle_event (GstBaseSink * basesink, + GstEvent * event); static void gst_basesink_base_init (gpointer g_class) @@ -304,15 +298,18 @@ sink = GST_BASESINK (object); - GST_LOCK (sink); switch (prop_id) { case PROP_HAS_LOOP: + GST_LOCK (sink); sink->has_loop = g_value_get_boolean (value); gst_basesink_set_all_pad_functions (sink); + GST_UNLOCK (sink); break; case PROP_HAS_CHAIN: sink->has_chain = g_value_get_boolean (value); case PROP_PREROLL_QUEUE_LEN: /* preroll lock necessary to serialize with finish_preroll */ @@ -324,7 +321,6 @@ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); } - GST_UNLOCK (sink); } @@ -374,59 +370,35 @@ /* with PREROLL_LOCK */ -static void -gst_basesink_preroll_queue_push (GstBaseSink * basesink, GstPad * pad, - GstBuffer * buffer) -{ - /* if we don't have a buffer we just start blocking */ - if (buffer == NULL) - goto block; - /* first buffer */ - if (basesink->preroll_queue->length == 0) { - GstBaseSinkClass *bclass = GST_BASESINK_GET_CLASS (basesink); - GST_DEBUG ("preroll buffer with TS: %" GST_TIME_FORMAT, - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buffer))); - if (bclass->preroll) - bclass->preroll (basesink, buffer); - } - if (basesink->preroll_queue->length < basesink->preroll_queue_max_len) { - DEBUG ("push %p %p\n", basesink, buffer); - g_queue_push_tail (basesink->preroll_queue, buffer); - return; -block: - /* block until the state changes, or we get a flush, or something */ - DEBUG ("block %p %p\n", basesink, buffer); - GST_DEBUG ("element %s waiting to finish preroll", - GST_ELEMENT_NAME (basesink)); - basesink->need_preroll = FALSE; - basesink->have_preroll = TRUE; - GST_PREROLL_WAIT (pad); - GST_DEBUG ("done preroll"); - basesink->have_preroll = FALSE; -} -/* with PREROLL_LOCK */ static GstFlowReturn gst_basesink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad) - GstBuffer *buf; + GstMiniObject *obj; GQueue *q = basesink->preroll_queue; GstFlowReturn ret; ret = GST_FLOW_OK; if (q) { - DEBUG ("empty queue\n"); - while ((buf = g_queue_pop_head (q))) { - DEBUG ("pop %p\n", buf); - ret = gst_basesink_handle_buffer (basesink, buf); + GST_DEBUG ("emptying queue"); + while ((obj = g_queue_pop_head (q))) { + /* we release the preroll lock while pushing so that we + * can still flush it while blocking on the clock or + * inside the element. */ + GST_PREROLL_UNLOCK (pad); + if (GST_IS_BUFFER (obj)) { + GST_DEBUG ("poped buffer %p", obj); + ret = gst_basesink_handle_buffer (basesink, GST_BUFFER (obj)); + } else { + GST_DEBUG ("poped event %p", obj); + gst_basesink_handle_event (basesink, GST_EVENT (obj)); + ret = GST_FLOW_OK; + } + GST_PREROLL_LOCK (pad); } - DEBUG ("queue len %p %d\n", basesink, q->length); + GST_DEBUG ("queue empty"); return ret; @@ -435,83 +407,126 @@ gst_basesink_preroll_queue_flush (GstBaseSink * basesink) - DEBUG ("flush %p\n", basesink); + GST_DEBUG ("flushing queue %p", basesink); - gst_buffer_unref (buf); + GST_DEBUG ("poped %p", obj); + gst_mini_object_unref (obj); + /* we can't have EOS anymore now */ + basesink->eos = FALSE; -typedef enum - PREROLL_QUEUEING, - PREROLL_PLAYING, - PREROLL_FLUSHING, - PREROLL_ERROR -} PrerollReturn; /* with STREAM_LOCK */ -PrerollReturn -gst_basesink_finish_preroll (GstBaseSink * basesink, GstPad * pad, +static GstFlowReturn +gst_basesink_handle_object (GstBaseSink * basesink, GstPad * pad, + GstMiniObject * obj) - gboolean flushing; + gint length; + gboolean have_event; - DEBUG ("finish preroll %p <\n", basesink); - /* lock order is important */ - GST_STATE_LOCK (basesink); GST_PREROLL_LOCK (pad); - DEBUG ("finish preroll %p >\n", basesink); + /* push object on the queue */ + GST_DEBUG ("push on queue %p %p", basesink, obj); + g_queue_push_tail (basesink->preroll_queue, obj); + have_event = GST_IS_EVENT (obj); + if (have_event && GST_EVENT_TYPE (obj) == GST_EVENT_EOS) { + basesink->eos = TRUE; + } + /* check if we are prerolling */ if (!basesink->need_preroll) goto no_preroll; + length = basesink->preroll_queue->length; + /* this is the first object we queued */ + if (length == 1) { + GST_DEBUG ("do preroll %p", obj); + /* if it's a buffer, we need to call the preroll method */ + if (GST_IS_BUFFER (obj)) { + GstBaseSinkClass *bclass; + bclass = GST_BASESINK_GET_CLASS (basesink); + if (bclass->preroll) + bclass->preroll (basesink, GST_BUFFER (obj)); + } + /* we are prerolling */ + GST_DEBUG ("finish preroll %p >", basesink); + GST_PREROLL_UNLOCK (pad); + /* have to release STREAM_LOCK as we cannot take the STATE_LOCK + * inside the STREAM_LOCK */ + GST_STREAM_UNLOCK (pad); + /* now we commit our state */ + GST_STATE_LOCK (basesink); + GST_DEBUG ("commit state %p >", basesink); gst_element_commit_state (GST_ELEMENT (basesink)); GST_STATE_UNLOCK (basesink); - gst_basesink_preroll_queue_push (basesink, pad, buffer); + /* reacquire stream lock, pad could be flushing now */ + GST_STREAM_LOCK (pad); GST_LOCK (pad); - flushing = GST_PAD_IS_FLUSHING (pad); - GST_UNLOCK (pad); - if (flushing) + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; + GST_UNLOCK (pad); - if (basesink->need_preroll) - goto still_queueing; - gst_basesink_preroll_queue_empty (basesink, pad); + /* and wait if needed */ + GST_PREROLL_LOCK (pad); + /* it is possible that the application set the state to PLAYING + * now in which case we don't need to block anymore. */ + if (!basesink->need_preroll) + goto no_preroll; + GST_DEBUG ("prerolled length %d", length); + /* see if we need to block now. We cannot block on events, only + * on buffers, the reason is that events can be sent from the + * application thread and we don't want to block there. */ + if (length > basesink->preroll_queue_max_len && !have_event) { + /* block until the state changes, or we get a flush, or something */ + GST_DEBUG ("element %s waiting to finish preroll", + GST_ELEMENT_NAME (basesink)); + basesink->have_preroll = TRUE; + GST_PREROLL_WAIT (pad); + GST_DEBUG ("done preroll"); + basesink->have_preroll = FALSE; GST_PREROLL_UNLOCK (pad); - return PREROLL_PLAYING; + GST_LOCK (pad); + goto flushing; + return GST_FLOW_OK; no_preroll: { + GstFlowReturn ret; + GST_DEBUG ("no preroll needed"); /* maybe it was another sink that blocked in preroll, need to check for buffers to drain */ - if (basesink->preroll_queue->length) - gst_basesink_preroll_queue_empty (basesink, pad); + ret = gst_basesink_preroll_queue_empty (basesink, pad); GST_PREROLL_UNLOCK (pad); - GST_STATE_UNLOCK (basesink); - return PREROLL_PLAYING; + return ret; flushing: + GST_UNLOCK (pad); GST_DEBUG ("pad is flushing"); - GST_PREROLL_UNLOCK (pad); - return PREROLL_FLUSHING; -still_queueing: - { - return PREROLL_QUEUEING; + return GST_FLOW_WRONG_STATE; @@ -526,83 +541,66 @@ bclass = GST_BASESINK_GET_CLASS (basesink); - DEBUG ("event %p\n", basesink); - if (bclass->event) - bclass->event (basesink, event); + GST_DEBUG ("event %p", event); switch (GST_EVENT_TYPE (event)) { case GST_EVENT_EOS: { - PrerollReturn pre_ret; - gboolean need_eos; + GstFlowReturn ret; GST_STREAM_LOCK (pad); /* EOS also finishes the preroll */ - pre_ret = gst_basesink_finish_preroll (basesink, pad, NULL); - if (pre_ret == PREROLL_PLAYING) { - GST_LOCK (basesink); - need_eos = basesink->eos = TRUE; - if (basesink->clock) { - /* wait for last buffer to finish if we have a valid end time */ - if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) { - basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock, - basesink->end_time + GST_ELEMENT (basesink)->base_time); - GST_UNLOCK (basesink); - gst_clock_id_wait (basesink->clock_id, NULL); - GST_LOCK (basesink); - if (basesink->clock_id) { - gst_clock_id_unref (basesink->clock_id); - basesink->clock_id = NULL; - } - basesink->end_time = GST_CLOCK_TIME_NONE; - need_eos = basesink->eos; - } - GST_UNLOCK (basesink); - /* if we are still EOS, we can post the EOS message */ - if (need_eos) { - /* ok, now we can post the message */ - gst_element_post_message (GST_ELEMENT (basesink), - gst_message_new_eos (GST_OBJECT (basesink))); - } - } + ret = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (event)); GST_STREAM_UNLOCK (pad); case GST_EVENT_DISCONTINUOUS: + { if (basesink->clock) { //gint64 value = GST_EVENT_DISCONT_OFFSET (event, 0).value; } case GST_EVENT_FLUSH: /* make sure we are not blocked on the clock also clear any pending * eos state. */ + if (bclass->event) + bclass->event (basesink, event); if (!GST_EVENT_FLUSH_DONE (event)) { + GST_PREROLL_LOCK (pad); + /* we need preroll after the flush */ + basesink->need_preroll = TRUE; + gst_basesink_preroll_queue_flush (basesink); + /* unlock from a possible state change/preroll */ + GST_PREROLL_SIGNAL (pad); GST_LOCK (basesink); - basesink->eos = FALSE; if (basesink->clock_id) { gst_clock_id_unschedule (basesink->clock_id); } GST_UNLOCK (basesink); - /* unlock from a possible state change/preroll */ - GST_PREROLL_LOCK (pad); - basesink->need_preroll = TRUE; - gst_basesink_preroll_queue_flush (basesink); - GST_PREROLL_SIGNAL (pad); GST_PREROLL_UNLOCK (pad); + /* and we need to commit our state again on the next + * prerolled buffer */ + GST_STATE_LOCK (basesink); + GST_STREAM_LOCK (pad); + gst_element_lost_state (GST_ELEMENT (basesink)); + GST_STREAM_UNLOCK (pad); + GST_STATE_UNLOCK (basesink); + /* now we are completely unblocked and the _chain method + * will return */ - /* now we are completely unblocked and the _chain method - * will return */ default: result = gst_pad_event_default (pad, event); @@ -640,11 +638,12 @@ * 4) wait on the clock, this blocks * 5) unref the clockid again */ +static gboolean gst_basesink_do_sync (GstBaseSink * basesink, GstBuffer * buffer) + gboolean result = TRUE; if (basesink->clock) { - GstClockReturn ret; GstClockTime start, end; GstBaseSinkClass *bclass; @@ -657,6 +656,8 @@ ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end)); if (GST_CLOCK_TIME_IS_VALID (start)) { + GstClockReturn ret; /* save clock id so that we can unlock it if needed */ GST_LOCK (basesink); basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock, @@ -671,13 +672,79 @@ gst_clock_id_unref (basesink->clock_id); basesink->clock_id = NULL; - /* FIXME, don't mess with end_time here */ - basesink->end_time = GST_CLOCK_TIME_NONE; GST_UNLOCK (basesink); GST_LOG_OBJECT (basesink, "clock entry done: %d", ret); + if (ret == GST_CLOCK_UNSCHEDULED) + result = FALSE; + return result; +} +/* handle an event + * + * 2) render the event + * 3) unref the event + */ +static inline gboolean +gst_basesink_handle_event (GstBaseSink * basesink, GstEvent * event) +{ + GstBaseSinkClass *bclass; + gboolean ret; + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_EOS: + GST_LOCK (basesink); + if (basesink->clock) { + /* wait for last buffer to finish if we have a valid end time */ + if (GST_CLOCK_TIME_IS_VALID (basesink->end_time)) { + basesink->clock_id = gst_clock_new_single_shot_id (basesink->clock, + basesink->end_time + GST_ELEMENT (basesink)->base_time); + GST_UNLOCK (basesink); + gst_clock_id_wait (basesink->clock_id, NULL); + GST_LOCK (basesink); + if (basesink->clock_id) { + gst_clock_id_unref (basesink->clock_id); + basesink->clock_id = NULL; + } + basesink->end_time = GST_CLOCK_TIME_NONE; + } + GST_UNLOCK (basesink); + break; + default: + bclass = GST_BASESINK_GET_CLASS (basesink); + if (bclass->event) + ret = bclass->event (basesink, event); + else + ret = TRUE; + GST_PREROLL_LOCK (basesink->sinkpad); + /* if we are still EOS, we can post the EOS message */ + if (basesink->eos) { + /* ok, now we can post the message */ + gst_element_post_message (GST_ELEMENT (basesink), + gst_message_new_eos (GST_OBJECT (basesink))); + GST_PREROLL_UNLOCK (basesink->sinkpad); + GST_DEBUG ("event unref %p %p", basesink, event); + gst_event_unref (event); + return ret; /* handle a buffer @@ -700,46 +767,21 @@ else ret = GST_FLOW_OK; - DEBUG ("unref %p %p\n", basesink, buf); + GST_DEBUG ("buffer unref after render %p", basesink, buf); gst_buffer_unref (buf); -gst_basesink_chain_unlocked (GstPad * pad, GstBuffer * buf) +gst_basesink_chain (GstPad * pad, GstBuffer * buf) GstBaseSink *basesink; - PrerollReturn result; + GstFlowReturn result; basesink = GST_BASESINK (GST_OBJECT_PARENT (pad)); - DEBUG ("chain_unlocked %p\n", basesink); - result = gst_basesink_finish_preroll (basesink, pad, buf); - DEBUG ("chain_unlocked %p after, result %d\n", basesink, result); - switch (result) { - case PREROLL_QUEUEING: - return GST_FLOW_OK; - case PREROLL_PLAYING: - return gst_basesink_handle_buffer (basesink, buf); - case PREROLL_FLUSHING: - return GST_FLOW_UNEXPECTED; - default: - g_assert_not_reached (); - return GST_FLOW_ERROR; -static GstFlowReturn -gst_basesink_chain (GstPad * pad, GstBuffer * buf) - GstFlowReturn result; - result = gst_basesink_chain_unlocked (pad, buf); + result = gst_basesink_handle_object (basesink, pad, GST_MINI_OBJECT (buf)); return result; @@ -761,7 +803,7 @@ if (result != GST_FLOW_OK) goto paused; + result = gst_basesink_chain (pad, buf); @@ -797,6 +839,7 @@ case GST_ACTIVATE_NONE: /* step 1, unblock clock sync (if any) or any other blocking thing */ if (basesink->clock_id) { gst_clock_id_unschedule (basesink->clock_id); @@ -807,8 +850,9 @@ if (bclass->unlock) bclass->unlock (basesink); - /* unlock preroll */ - GST_PREROLL_LOCK (pad); + /* flush out the data thread if it's locked in finish_preroll */ + gst_basesink_preroll_queue_flush (basesink); + basesink->need_preroll = FALSE; GST_PREROLL_SIGNAL (pad); GST_PREROLL_UNLOCK (pad); @@ -828,8 +872,6 @@ GstBaseSink *basesink = GST_BASESINK (element); GstElementState transition = GST_STATE_TRANSITION (element); - DEBUG ("state change > %p %x\n", basesink, transition); switch (transition) { case GST_STATE_NULL_TO_READY: @@ -838,25 +880,31 @@ * is no data flow in READY so we can safely assume we need to preroll. */ basesink->offset = 0; GST_PREROLL_LOCK (basesink->sinkpad); - basesink->need_preroll = TRUE; basesink->have_preroll = FALSE; + basesink->need_preroll = TRUE; GST_PREROLL_UNLOCK (basesink->sinkpad); ret = GST_STATE_ASYNC; case GST_STATE_PAUSED_TO_PLAYING: + /* if we have EOS, we should empty the queue now as there will + * be no more data received in the chain function. + * FIXME, this could block the state change function too long when + * we are pushing and syncing the buffers, better start a new + * thread to do this. */ + gst_basesink_preroll_queue_empty (basesink, basesink->sinkpad); + /* don't need the preroll anymore */ if (basesink->have_preroll) { /* now let it play */ GST_PREROLL_SIGNAL (basesink->sinkpad); - } else { - /* FIXME. We do not have a preroll and we don't need it anymore - * now, this is a case we want to avoid. One way would be to make - * a 'lost state' function that makes get_state return PAUSED with - * ASYNC to indicate that we are prerolling again. */ - basesink->need_preroll = FALSE; @@ -866,20 +914,25 @@ case GST_STATE_PLAYING_TO_PAUSED: - gboolean eos; - /* unlock clock wait if any */ + /* unlock clock wait if any */ - eos = basesink->eos; - GST_PREROLL_LOCK (basesink->sinkpad); + /* unlock any subclasses */ + if (bclass->unlock) + bclass->unlock (basesink); /* if we don't have a preroll buffer and we have not received EOS, * we need to wait for a preroll */ - if (!basesink->have_preroll && !eos) { + if (!basesink->have_preroll && !basesink->eos) { basesink->need_preroll = TRUE; ret = GST_STATE_ASYNC; @@ -887,20 +940,6 @@ case GST_STATE_PAUSED_TO_READY: - /* flush out the data thread if it's locked in finish_preroll */ - gst_basesink_preroll_queue_flush (basesink); - if (basesink->have_preroll) - GST_PREROLL_SIGNAL (basesink->sinkpad); - basesink->need_preroll = FALSE; - basesink->have_preroll = FALSE; - GST_PREROLL_UNLOCK (basesink->sinkpad); - /* clear EOS state */ - basesink->eos = FALSE; case GST_STATE_READY_TO_NULL: @@ -908,6 +947,5 @@ - DEBUG ("state change < %p %x\n", basesink, transition); |