From: <wt...@ke...> - 2007-09-14 20:24:35
|
CVS Root: /cvs/gstreamer Module: gstreamer Changes by: wtay Date: Fri Sep 14 2007 20:24:36 UTC Log message: * plugins/elements/gstqueue.c: (gst_queue_locked_enqueue), (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_push_one), (gst_queue_handle_src_query), (gst_queue_sink_activate_push), (gst_queue_src_activate_push): * plugins/elements/gstqueue.h: When downstream returns UNEXPECTED from pushing a buffer, don't try to push more buffers but allow pushing of EOS and NEWSEGMENT. Add some more debug info here and there. Fixes #476514. Modified files: . : ChangeLog plugins/elements: gstqueue.c gstqueue.h Links: http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/ChangeLog.diff?r1=1.3413&r2=1.3414 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/plugins/elements/gstqueue.c.diff?r1=1.199&r2=1.200 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gstreamer/plugins/elements/gstqueue.h.diff?r1=1.42&r2=1.43 ====Begin Diffs==== Index: ChangeLog =================================================================== RCS file: /cvs/gstreamer/gstreamer/ChangeLog,v retrieving revision 1.3413 retrieving revision 1.3414 diff -u -d -r1.3413 -r1.3414 --- ChangeLog 14 Sep 2007 15:52:24 -0000 1.3413 +++ ChangeLog 14 Sep 2007 20:24:20 -0000 1.3414 @@ -1,5 +1,16 @@ 2007-09-14 Wim Taymans <wim...@gm...> + * plugins/elements/gstqueue.c: (gst_queue_locked_enqueue), + (gst_queue_handle_sink_event), (gst_queue_chain), + (gst_queue_push_one), (gst_queue_handle_src_query), + (gst_queue_sink_activate_push), (gst_queue_src_activate_push): + * plugins/elements/gstqueue.h: + When downstream returns UNEXPECTED from pushing a buffer, don't try to + push more buffers but allow pushing of EOS and NEWSEGMENT. + Add some more debug info here and there. Fixes #476514. + +2007-09-14 Wim Taymans <wim...@gm...> * libs/gst/base/gstbasesink.c: (gst_base_sink_init), (gst_base_sink_preroll_queue_flush), (gst_base_sink_commit_state), (gst_base_sink_wait_preroll), (gst_base_sink_needs_preroll), Index: gstqueue.c RCS file: /cvs/gstreamer/gstreamer/plugins/elements/gstqueue.c,v retrieving revision 1.199 retrieving revision 1.200 diff -u -d -r1.199 -r1.200 --- gstqueue.c 13 Sep 2007 17:15:38 -0000 1.199 +++ gstqueue.c 14 Sep 2007 20:24:22 -0000 1.200 @@ -616,7 +616,7 @@ GST_QUEUE_SIGNAL_DEL (queue); } -/* enqueue an item an update the level stats */ +/* enqueue an item an update the level stats, with QUEUE_LOCK */ static void gst_queue_locked_enqueue (GstQueue * queue, gpointer item) { @@ -636,9 +636,15 @@ /* Zero the thresholds, this makes sure the queue is completely * filled and we can read all data from the queue. */ GST_QUEUE_CLEAR_LEVEL (queue->min_threshold); + /* mark the queue as EOS. This prevents us from accepting more data. */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "got EOS from upstream"); + queue->eos = TRUE; break; case GST_EVENT_NEWSEGMENT: apply_segment (queue, event, &queue->sink_segment); + /* a new segment allows us to accept more buffers if we got UNEXPECTED + * from downstream */ + queue->unexpected = FALSE; default: @@ -747,6 +753,8 @@ GST_QUEUE_MUTEX_LOCK (queue); gst_queue_locked_flush (queue); queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; if (gst_pad_is_linked (queue->srcpad)) { gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop, queue->srcpad); @@ -762,6 +770,9 @@ if (GST_EVENT_IS_SERIALIZED (event)) { /* serialized events go in the queue */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + /* refuse more events on EOS */ + if (queue->eos) + goto out_eos; gst_queue_locked_enqueue (queue, event); GST_QUEUE_MUTEX_UNLOCK (queue); } else { @@ -776,6 +787,15 @@ /* ERRORS */ out_flushing: { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "refusing event, we are flushing"); + GST_QUEUE_MUTEX_UNLOCK (queue); + gst_buffer_unref (event); + return FALSE; + } +out_eos: + { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "refusing event, we are EOS"); GST_QUEUE_MUTEX_UNLOCK (queue); gst_buffer_unref (event); return FALSE; @@ -815,6 +835,11 @@ /* we have to lock the queue since we span threads */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + /* when we received EOS, we refuse any more data */ + if (queue->eos) + goto out_eos; + if (queue->unexpected) + goto out_unexpected; timestamp = GST_BUFFER_TIMESTAMP (buffer); duration = GST_BUFFER_DURATION (buffer); @@ -910,6 +935,25 @@ return ret; } + GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS"); + gst_buffer_unref (buffer); + return GST_FLOW_UNEXPECTED; +out_unexpected: + "exit because we received UNEXPECTED"); /* dequeue an item from the queue an push it downstream. This functions returns @@ -924,6 +968,7 @@ if (data == NULL) goto no_item; +next: if (GST_IS_BUFFER (data)) { GstBuffer *buffer = GST_BUFFER_CAST (data); @@ -933,6 +978,42 @@ /* need to check for srcresult here as well */ GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); + if (result == GST_FLOW_UNEXPECTED) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "got UNEXPECTED from downstream"); + /* stop pushing buffers, we dequeue all items until we see an item that we + * can push again, which is EOS or NEWSEGMENT. If there is nothing in the + * queue we can push, we set a flag to make the sinkpad refuse more + * buffers with an UNEXPECTED return value. */ + while ((data = gst_queue_locked_dequeue (queue))) { + if (GST_IS_BUFFER (data)) { + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "dropping UNEXPECTED buffer %p", data); + gst_buffer_unref (GST_BUFFER_CAST (data)); + } else if (GST_IS_EVENT (data)) { + GstEvent *event = GST_EVENT_CAST (data); + GstEventType type = GST_EVENT_TYPE (event); + if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) { + /* we found a pushable item in the queue, push it out */ + GST_CAT_LOG_OBJECT (queue_dataflow, queue, + "pushing pushable event %s after UNEXPECTED %p", + GST_EVENT_TYPE_NAME (event)); + goto next; + } + "dropping UNEXPECTED event %p", event); + gst_event_unref (event); + } + } + /* no more items in the queue. Set the unexpected flag so that upstream + * make us refuse any more buffers on the sinkpad. Since we will still + * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the + * task function does not shut down. */ + queue->unexpected = TRUE; + result = GST_FLOW_OK; + } } else if (GST_IS_EVENT (data)) { GstEvent *event = GST_EVENT_CAST (data); GstEventType type = GST_EVENT_TYPE (event); @@ -943,8 +1024,11 @@ /* if we're EOS, return UNEXPECTED so that the task pauses. */ - if (type == GST_EVENT_EOS) + if (type == GST_EVENT_EOS) { + "pushed EOS event %p, return UNEXPECTED", event); result = GST_FLOW_UNEXPECTED; return result; @@ -1106,6 +1190,8 @@ if (active) { GST_QUEUE_MUTEX_LOCK (queue); queue->srcresult = GST_FLOW_OK; + queue->eos = FALSE; + queue->unexpected = FALSE; } else { /* step 1, unblock chain function */ @@ -1131,6 +1217,8 @@ /* we do not start the task yet if the pad is not connected */ if (gst_pad_is_linked (pad)) result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad); Index: gstqueue.h RCS file: /cvs/gstreamer/gstreamer/plugins/elements/gstqueue.h,v retrieving revision 1.42 retrieving revision 1.43 diff -u -d -r1.42 -r1.43 --- gstqueue.h 24 May 2007 09:41:51 -0000 1.42 +++ gstqueue.h 14 Sep 2007 20:24:22 -0000 1.43 @@ -87,6 +87,8 @@ /* flowreturn when srcpad is paused */ GstFlowReturn srcresult; + gboolean unexpected; + gboolean eos; /* the queue of data we're keeping our grubby hands on */ GQueue *queue; |