From: <wt...@ke...> - 2007-09-03 21:19:50
|
CVS Root: /cvs/gstreamer Module: gst-plugins-bad Changes by: wtay Date: Mon Sep 03 2007 21:19:49 UTC Log message: * gst/rtpmanager/gstrtpbin-marshal.list: * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_get_client), (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (create_stream), (gst_rtp_bin_init), (caps_changed), (new_ssrc_pad_found), (create_recv_rtp), (create_recv_rtcp), (create_send_rtp): * gst/rtpmanager/gstrtpbin.h: Updated example pipelines in docs. Handle sync_rtcp buffers from the SSRC demuxer to perform lip-sync. Set the default latency correctly. Add some more points where we can get caps. * gst/rtpmanager/gstrtpjitterbuffer.c: (gst_rtp_jitter_buffer_class_init), (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_loop), (gst_rtp_jitter_buffer_query), (gst_rtp_jitter_buffer_set_property), (gst_rtp_jitter_buffer_get_property): Add ts-offset property to control timestamping. * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), (gst_rtp_session_init), (gst_rtp_session_set_property), (gst_rtp_session_get_property), (get_current_ntp_ns_time), (rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state), (gst_rtp_session_send_rtcp), (gst_rtp_session_sync_rtcp), (gst_rtp_session_cache_caps), (gst_rtp_session_clock_rate), (gst_rtp_session_sink_setcaps), (gst_rtp_session_chain_recv_rtp), (gst_rtp_session_event_send_rtp_sink), (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), (create_recv_rtcp_sink), (create_send_rtp_sink), (create_send_rtcp_src): Various cleanups. Feed rtpsession manager with NTP time based on pipeline clock when handling RTP packets and RTCP timeouts. Perform all RTCP with the system clock. Set caps on RTCP outgoing buffers. * gst/rtpmanager/gstrtpssrcdemux.c: (find_demux_pad_for_ssrc), (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_base_init), (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event), (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), (gst_rtp_ssrc_demux_rtcp_chain): * gst/rtpmanager/gstrtpssrcdemux.h: Also demux RTCP messages. * gst/rtpmanager/rtpsession.c: (rtp_session_set_callbacks), (update_arrival_stats), (rtp_session_process_rtp), (rtp_session_process_rb), (rtp_session_process_sr), (rtp_session_process_rr), (rtp_session_process_rtcp), (rtp_session_send_rtp), (rtp_session_send_bye), (session_start_rtcp), (session_report_blocks), (session_cleanup), (rtp_session_on_timeout): * gst/rtpmanager/rtpsession.h: Remove the get_time callback, the GStreamer part will feed us with enough timing information. Split sync timing and RTCP timing information. Factor out common RB handling for SR and RR. Send out SR RTCP packets for lip-sync. Move SR and RR packet info generation to the source. * gst/rtpmanager/rtpsource.c: (rtp_source_init), (rtp_source_update_caps), (get_clock_rate), (calculate_jitter), (rtp_source_process_rtp), (rtp_source_send_rtp), (rtp_source_process_sr), (rtp_source_process_rb), (rtp_source_get_new_sr), (rtp_source_get_new_rb), (rtp_source_get_last_sr): * gst/rtpmanager/rtpsource.h: * gst/rtpmanager/rtpstats.h: Use caps on incomming buffers to get timing information when they are there. Calculate clock scew of the receiver compared to the sender and adjust the rtp timestamps. Calculate the round trip in sources. Do SR and RR calculations in the source. Modified files: . : ChangeLog gst/rtpmanager : gstrtpbin-marshal.list gstrtpbin.c gstrtpbin.h gstrtpjitterbuffer.c gstrtpsession.c gstrtpssrcdemux.c gstrtpssrcdemux.h rtpsession.c rtpsession.h rtpsource.c rtpsource.h rtpstats.h Links: http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/ChangeLog.diff?r1=1.2731&r2=1.2732 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpbin-marshal.list.diff?r1=1.3&r2=1.4 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpbin.c.diff?r1=1.20&r2=1.21 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpbin.h.diff?r1=1.12&r2=1.13 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpjitterbuffer.c.diff?r1=1.20&r2=1.21 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpsession.c.diff?r1=1.18&r2=1.19 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpssrcdemux.c.diff?r1=1.6&r2=1.7 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpssrcdemux.h.diff?r1=1.3&r2=1.4 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/rtpsession.c.diff?r1=1.14&r2=1.15 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/rtpsession.h.diff?r1=1.6&r2=1.7 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/rtpsource.c.diff?r1=1.9&r2=1.10 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/rtpsource.h.diff?r1=1.5&r2=1.6 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins-bad/gst/rtpmanager/rtpstats.h.diff?r1=1.4&r2=1.5 ====Begin Diffs==== Index: ChangeLog =================================================================== RCS file: /cvs/gstreamer/gst-plugins-bad/ChangeLog,v retrieving revision 1.2731 retrieving revision 1.2732 diff -u -d -r1.2731 -r1.2732 --- ChangeLog 3 Sep 2007 20:07:16 -0000 1.2731 +++ ChangeLog 3 Sep 2007 21:19:32 -0000 1.2732 @@ -1,3 +1,79 @@ +2007-09-03 Wim Taymans <wim...@gm...> + + * gst/rtpmanager/gstrtpbin-marshal.list: + * gst/rtpmanager/gstrtpbin.c: (gst_rtp_bin_get_client), + (gst_rtp_bin_associate), (gst_rtp_bin_sync_chain), (create_stream), + (gst_rtp_bin_init), (caps_changed), (new_ssrc_pad_found), + (create_recv_rtp), (create_recv_rtcp), (create_send_rtp): + * gst/rtpmanager/gstrtpbin.h: + Updated example pipelines in docs. + Handle sync_rtcp buffers from the SSRC demuxer to perform lip-sync. + Set the default latency correctly. + Add some more points where we can get caps. + * gst/rtpmanager/gstrtpjitterbuffer.c: + (gst_rtp_jitter_buffer_class_init), + (gst_jitter_buffer_sink_parse_caps), (gst_rtp_jitter_buffer_loop), + (gst_rtp_jitter_buffer_query), + (gst_rtp_jitter_buffer_set_property), + (gst_rtp_jitter_buffer_get_property): + Add ts-offset property to control timestamping. + * gst/rtpmanager/gstrtpsession.c: (gst_rtp_session_class_init), + (gst_rtp_session_init), (gst_rtp_session_set_property), + (gst_rtp_session_get_property), (get_current_ntp_ns_time), + (rtcp_thread), (stop_rtcp_thread), (gst_rtp_session_change_state), + (gst_rtp_session_send_rtcp), (gst_rtp_session_sync_rtcp), + (gst_rtp_session_cache_caps), (gst_rtp_session_clock_rate), + (gst_rtp_session_sink_setcaps), (gst_rtp_session_chain_recv_rtp), + (gst_rtp_session_event_send_rtp_sink), + (gst_rtp_session_chain_send_rtp), (create_recv_rtp_sink), + (create_recv_rtcp_sink), (create_send_rtp_sink), + (create_send_rtcp_src): + Various cleanups. + Feed rtpsession manager with NTP time based on pipeline clock when + handling RTP packets and RTCP timeouts. + Perform all RTCP with the system clock. + Set caps on RTCP outgoing buffers. + * gst/rtpmanager/gstrtpssrcdemux.c: (find_demux_pad_for_ssrc), + (create_demux_pad_for_ssrc), (gst_rtp_ssrc_demux_base_init), + (gst_rtp_ssrc_demux_init), (gst_rtp_ssrc_demux_sink_event), + (gst_rtp_ssrc_demux_rtcp_sink_event), (gst_rtp_ssrc_demux_chain), + (gst_rtp_ssrc_demux_rtcp_chain): + * gst/rtpmanager/gstrtpssrcdemux.h: + Also demux RTCP messages. + * gst/rtpmanager/rtpsession.c: (rtp_session_set_callbacks), + (update_arrival_stats), (rtp_session_process_rtp), + (rtp_session_process_rb), (rtp_session_process_sr), + (rtp_session_process_rr), (rtp_session_process_rtcp), + (rtp_session_send_rtp), (rtp_session_send_bye), + (session_start_rtcp), (session_report_blocks), (session_cleanup), + (rtp_session_on_timeout): + * gst/rtpmanager/rtpsession.h: + Remove the get_time callback, the GStreamer part will feed us with + enough timing information. + Split sync timing and RTCP timing information. + Factor out common RB handling for SR and RR. + Send out SR RTCP packets for lip-sync. + Move SR and RR packet info generation to the source. + * gst/rtpmanager/rtpsource.c: (rtp_source_init), + (rtp_source_update_caps), (get_clock_rate), (calculate_jitter), + (rtp_source_process_rtp), (rtp_source_send_rtp), + (rtp_source_process_sr), (rtp_source_process_rb), + (rtp_source_get_new_sr), (rtp_source_get_new_rb), + (rtp_source_get_last_sr): + * gst/rtpmanager/rtpsource.h: + * gst/rtpmanager/rtpstats.h: + Use caps on incomming buffers to get timing information when they are + there. + Calculate clock scew of the receiver compared to the sender and adjust + the rtp timestamps. + Calculate the round trip in sources. + Do SR and RR calculations in the source. 2007-09-03 Renato Filho <ren...@in...> * configure.ac: Index: gstrtpbin-marshal.list RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpbin-marshal.list,v retrieving revision 1.3 retrieving revision 1.4 diff -u -d -r1.3 -r1.4 --- gstrtpbin-marshal.list 10 Aug 2007 17:16:53 -0000 1.3 +++ gstrtpbin-marshal.list 3 Sep 2007 21:19:33 -0000 1.4 @@ -3,3 +3,4 @@ BOXED:UINT,UINT VOID:UINT,OBJECT VOID:UINT,UINT +VOID:OBJECT,OBJECT Index: gstrtpbin.c RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpbin.c,v retrieving revision 1.20 retrieving revision 1.21 diff -u -d -r1.20 -r1.21 --- gstrtpbin.c 28 Aug 2007 20:30:15 -0000 1.20 +++ gstrtpbin.c 3 Sep 2007 21:19:33 -0000 1.21 @@ -79,12 +79,12 @@ * <programlisting> * gst-launch gstrtpbin name=rtpbin \ * v4l2src ! ffmpegcolorspace ! ffenc_h263 ! rtph263ppay ! rtpbin.send_rtp_sink_0 \ - * rtpbin.send_rtp_src_0 ! udpsink port=5000 \ - * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false \ - * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \ - * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \ - * rtpbin.send_rtp_src_1 ! udpsink port=5002 \ - * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false \ + * rtpbin.send_rtp_src_0 ! udpsink port=5000 \ + * rtpbin.send_rtcp_src_0 ! udpsink port=5001 sync=false async=false \ + * udpsrc port=5005 ! rtpbin.recv_rtcp_sink_0 \ + * audiotestsrc ! amrnbenc ! rtpamrpay ! rtpbin.send_rtp_sink_1 \ + * rtpbin.send_rtp_src_1 ! udpsink port=5002 \ + * rtpbin.send_rtcp_src_1 ! udpsink port=5003 sync=false async=false \ * udpsrc port=5007 ! rtpbin.recv_rtcp_sink_1 * </programlisting> * Encode and payload H263 video captured from a v4l2src. Encode and payload AMR @@ -94,21 +94,22 @@ * on port 5001 and the audio RTCP packets for session 0 are sent on port 5003. * RTCP packets for session 0 are received on port 5005 and RTCP for session 1 * is received on port 5007. Since RTCP packets from the sender should be sent - * as soon as possible, sync=false is configured on udpsink. + * as soon as possible and do not participate in preroll, sync=false and + * async=false is configured on udpsink * </para> * <para> - * gst-launch -v gstrtpbin name=rtpbin \ + * gst-launch -v gstrtpbin name=rtpbin \ * udpsrc caps="application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H263-1998" \ - * port=5000 ! rtpbin.recv_rtp_sink_0 \ - * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \ - * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \ - * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false \ - * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \ - * port=5002 ! rtpbin.recv_rtp_sink_1 \ - * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \ - * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \ - * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false + * port=5000 ! rtpbin.recv_rtp_sink_0 \ + * rtpbin. ! rtph263pdepay ! ffdec_h263 ! xvimagesink \ + * udpsrc port=5001 ! rtpbin.recv_rtcp_sink_0 \ + * rtpbin.send_rtcp_src_0 ! udpsink port=5005 sync=false async=false \ + * udpsrc caps="application/x-rtp,media=(string)audio,clock-rate=(int)8000,encoding-name=(string)AMR,encoding-params=(string)1,octet-align=(string)1" \ + * port=5002 ! rtpbin.recv_rtp_sink_1 \ + * rtpbin. ! rtpamrdepay ! amrnbdec ! alsasink \ + * udpsrc port=5003 ! rtpbin.recv_rtcp_sink_1 \ + * rtpbin.send_rtcp_src_1 ! udpsink port=5007 sync=false async=false * Receive H263 on port 5000, send it through rtpbin in session 0, depayload, * decode and display the video. @@ -122,7 +123,7 @@ * </refsect2> * - * Last reviewed on 2007-08-28 (0.10.6) + * Last reviewed on 2007-08-30 (0.10.6) */ #ifdef HAVE_CONFIG_H @@ -130,6 +131,9 @@ #endif #include <string.h> +#include <gst/rtp/gstrtpbuffer.h> +#include <gst/rtp/gstrtcpbuffer.h> #include "gstrtpbin-marshal.h" #include "gstrtpbin.h" @@ -187,6 +191,14 @@ GST_STATIC_CAPS ("application/x-rtp") ); +/* padtemplate for the internal pad */ +static GstStaticPadTemplate rtpbin_sync_sink_template = +GST_STATIC_PAD_TEMPLATE ("sink_%d", + GST_PAD_SINK, + GST_PAD_SOMETIMES, + GST_STATIC_CAPS ("application/x-rtcp") + ); #define GST_RTP_BIN_GET_PRIVATE(obj) \ (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_RTP_BIN, GstRtpBinPrivate)) @@ -242,16 +254,37 @@ { /* the SSRC of this stream */ guint32 ssrc; /* parent bin */ GstRtpBin *bin; /* the session this SSRC belongs to */ GstRtpBinSession *session; /* the jitterbuffer of the SSRC */ GstElement *buffer; /* the PT demuxer of the SSRC */ GstElement *demux; gulong demux_newpad_sig; gulong demux_ptreq_sig; + /* the internal pad we use to get RTCP sync messages */ + GstPad *sync_pad; + gboolean have_sync; + guint64 last_unix; + guint64 last_extrtptime; + /* mapping to local RTP and NTP time */ + guint64 local_rtp; + guint64 local_unix; + gint64 unix_delta; + /* for lip-sync */ + guint64 clock_base; + gint clock_rate; + gint64 ts_offset; + gint64 prev_ts_offset; }; #define GST_RTP_SESSION_LOCK(sess) g_mutex_lock ((sess)->lock) @@ -289,12 +322,28 @@ GstPad *recv_rtp_sink; GstPad *recv_rtp_src; GstPad *recv_rtcp_sink; - GstPad *recv_rtcp_src; + GstPad *sync_src; GstPad *send_rtp_sink; GstPad *send_rtp_src; GstPad *send_rtcp_src; +/* Manages the RTP streams that come from one client and should therefore be + * synchronized. + */ +struct _GstRtpBinClient +{ + /* the common CNAME for the streams */ + gchar *cname; + guint cname_len; + /* the streams */ + guint nstreams; + GSList *streams; + gint64 min_delta; +}; /* find a session with the given id. Must be called with RTP_BIN_LOCK */ static GstRtpBinSession * find_session_by_id (GstRtpBin * rtpbin, gint id) @@ -513,6 +562,271 @@ GST_RTP_BIN_UNLOCK (bin); } +static GstRtpBinClient * +gst_rtp_bin_get_client (GstRtpBin * bin, guint8 len, guint8 * data, + gboolean * created) + GstRtpBinClient *result = NULL; + GSList *walk; + for (walk = bin->clients; walk; walk = g_slist_next (walk)) { + GstRtpBinClient *client = (GstRtpBinClient *) walk->data; + if (len != client->cname_len) + continue; + if (!strncmp ((gchar *) data, client->cname, client->cname_len)) { + GST_DEBUG_OBJECT (bin, "found existing client %p with CNAME %s", client, + client->cname); + result = client; + break; + } + } + /* nothing found, create one */ + if (result == NULL) { + result = g_new0 (GstRtpBinClient, 1); + result->cname = g_strndup ((gchar *) data, len); + result->cname_len = len; + result->min_delta = G_MAXINT64; + bin->clients = g_slist_prepend (bin->clients, result); + GST_DEBUG_OBJECT (bin, "created new client %p with CNAME %s", result, + result->cname); + return result; +} +/* associate a stream to the given CNAME. This will make sure all streams for + * that CNAME are synchronized together. */ +static void +gst_rtp_bin_associate (GstRtpBin * bin, GstRtpBinStream * stream, guint8 len, + guint8 * data) + GstRtpBinClient *client; + gboolean created; + /* first find or create the CNAME */ + client = gst_rtp_bin_get_client (bin, len, data, &created); + /* find stream in the client */ + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + if (ostream == stream) + /* not found, add it to the list */ + if (walk == NULL) { + GST_DEBUG_OBJECT (bin, + "new association of SSRC %08x with client %p with CNAME %s", + stream->ssrc, client, client->cname); + client->streams = g_slist_prepend (client->streams, stream); + client->nstreams++; + } else { + "found association of SSRC %08x with client %p with CNAME %s", + /* we can only continue if we know the local clock-base and clock-rate */ + if (stream->clock_base == -1) + goto no_clock_base; + if (stream->clock_rate <= 0) + goto no_clock_rate; + /* map last RTP time to local timeline using our clock-base */ + stream->local_rtp = stream->last_extrtptime - stream->clock_base; + GST_DEBUG_OBJECT (bin, + "base %" G_GUINT64_FORMAT ", extrtptime %" G_GUINT64_FORMAT + ", local RTP %" G_GUINT64_FORMAT ", clock-rate %d", stream->clock_base, + stream->last_extrtptime, stream->local_rtp, stream->clock_rate); + /* calculate local NTP time in gstreamer timestamp */ + stream->local_unix = + gst_util_uint64_scale_int (stream->local_rtp, GST_SECOND, + stream->clock_rate); + /* calculate delta between server and receiver */ + stream->unix_delta = stream->last_unix - stream->local_unix; + "local UNIX %" G_GUINT64_FORMAT ", remote UNIX %" G_GUINT64_FORMAT + ", delta %" G_GINT64_FORMAT, stream->local_unix, stream->last_unix, + stream->unix_delta); + /* recalc inter stream playout offset, but only if there are more than one + * stream. */ + if (client->nstreams > 1) { + gint64 min; + /* calculate the min of all deltas */ + min = G_MAXINT64; + for (walk = client->streams; walk; walk = g_slist_next (walk)) { + GstRtpBinStream *ostream = (GstRtpBinStream *) walk->data; + if (ostream->unix_delta < min) + min = ostream->unix_delta; + GST_DEBUG_OBJECT (bin, "client %p min delta %" G_GINT64_FORMAT, client, + min); + /* calculate offsets for each stream */ + ostream->ts_offset = ostream->unix_delta - min; + /* delta changed, see how much */ + if (ostream->prev_ts_offset != ostream->ts_offset) { + gint64 diff; + if (ostream->prev_ts_offset > ostream->ts_offset) + diff = ostream->prev_ts_offset - ostream->ts_offset; + else + diff = ostream->ts_offset - ostream->prev_ts_offset; + /* only change diff when it changed more than 1 millisecond. This + * compensates for rounding errors in NTP to RTP timestamp + * conversions */ + if (diff > GST_MSECOND) + g_object_set (ostream->buffer, "ts-offset", ostream->ts_offset, NULL); + ostream->prev_ts_offset = ostream->ts_offset; + } + GST_DEBUG_OBJECT (bin, "stream SSRC %08x, delta %" G_GINT64_FORMAT, + ostream->ssrc, ostream->ts_offset); + return; +no_clock_base: + { + GST_WARNING_OBJECT (bin, "we have no clock-base"); + return; +no_clock_rate: + GST_WARNING_OBJECT (bin, "we have no clock-rate"); +#define GST_RTCP_BUFFER_FOR_PACKETS(b,buffer,packet) \ + for ((b) = gst_rtcp_buffer_get_first_packet ((buffer), (packet)); (b); \ + (b) = gst_rtcp_packet_move_to_next ((packet))) +#define GST_RTCP_SDES_FOR_ITEMS(b,packet) \ + for ((b) = gst_rtcp_packet_sdes_first_item ((packet)); (b); \ + (b) = gst_rtcp_packet_sdes_next_item ((packet))) +#define GST_RTCP_SDES_FOR_ENTRIES(b,packet) \ + for ((b) = gst_rtcp_packet_sdes_first_entry ((packet)); (b); \ + (b) = gst_rtcp_packet_sdes_next_entry ((packet))) +static GstFlowReturn +gst_rtp_bin_sync_chain (GstPad * pad, GstBuffer * buffer) + GstFlowReturn ret = GST_FLOW_OK; + GstRtpBinStream *stream; + GstRtpBin *bin; + GstRTCPPacket packet; + guint32 ssrc; + guint64 ntptime; + guint32 rtptime; + gboolean have_sr, have_sdes; + gboolean more; + stream = gst_pad_get_element_private (pad); + bin = stream->bin; + GST_DEBUG_OBJECT (bin, "received sync packet"); + if (!gst_rtcp_buffer_validate (buffer)) + goto invalid_rtcp; + have_sr = FALSE; + have_sdes = FALSE; + GST_RTCP_BUFFER_FOR_PACKETS (more, buffer, &packet) { + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + /* only parse first. There is only supposed to be one SR in the packet + * but we will deal with malformed packets gracefully */ + if (have_sr) + break; + /* get NTP and RTP times */ + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, &ntptime, &rtptime, + NULL, NULL); + GST_DEBUG_OBJECT (bin, "received sync packet from SSRC %08x", ssrc); + /* ignore SR that is not ours */ + if (ssrc != stream->ssrc) + continue; + have_sr = TRUE; + /* store values in the stream */ + stream->have_sync = TRUE; + stream->last_unix = gst_rtcp_ntp_to_unix (ntptime); + /* use extended timestamp */ + gst_rtp_buffer_ext_timestamp (&stream->last_extrtptime, rtptime); + break; + case GST_RTCP_TYPE_SDES: + { + gboolean more_items, more_entries; + /* only deal with first SDES, there is only supposed to be one SDES in + * the RTCP packet but we deal with bad packets gracefully. Also bail + * out if we have not seen an SR item yet. */ + if (have_sdes || !have_sr) + GST_RTCP_SDES_FOR_ITEMS (more_items, &packet) { + /* skip items that are not about the SSRC of the sender */ + if (gst_rtcp_packet_sdes_get_ssrc (&packet) != ssrc) + continue; + /* find the CNAME entry */ + GST_RTCP_SDES_FOR_ENTRIES (more_entries, &packet) { + GstRTCPSDESType type; + guint8 len; + guint8 *data; + gst_rtcp_packet_sdes_get_entry (&packet, &type, &len, &data); + if (type == GST_RTCP_SDES_CNAME) { + stream->clock_base = GST_BUFFER_OFFSET (buffer); + /* associate the stream to CNAME */ + gst_rtp_bin_associate (bin, stream, len, data); + } + } + } + have_sdes = TRUE; + default: + /* we can ignore these packets */ + gst_buffer_unref (buffer); + return ret; + /* ERRORS */ +invalid_rtcp: + /* this is fatal and should be filtered earlier */ + GST_ELEMENT_ERROR (bin, STREAM, DECODE, (NULL), + ("invalid RTCP packet received")); + gst_buffer_unref (buffer); + return GST_FLOW_ERROR; /* create a new stream with @ssrc in @session. Must be called with * RTP_SESSION_LOCK. */ static GstRtpBinStream * @@ -520,6 +834,8 @@ GstElement *buffer, *demux; GstRtpBinStream *stream; + GstPadTemplate *templ; + gchar *padname; if (!(buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL))) goto no_jitterbuffer; @@ -533,8 +849,22 @@ stream->session = session; stream->buffer = buffer; stream->demux = demux; + stream->last_extrtptime = -1; + stream->have_sync = FALSE; session->streams = g_slist_prepend (session->streams, stream); + /* make an internal sinkpad for RTCP sync packets. Take ownership of the + * pad. We will link this pad later. */ + padname = g_strdup_printf ("sync_%d", ssrc); + templ = gst_static_pad_template_get (&rtpbin_sync_sink_template); + stream->sync_pad = gst_pad_new_from_template (templ, padname); + gst_object_unref (templ); + gst_object_ref (stream->sync_pad); + gst_object_sink (stream->sync_pad); + gst_pad_set_element_private (stream->sync_pad, stream); + gst_pad_set_chain_function (stream->sync_pad, gst_rtp_bin_sync_chain); + gst_pad_set_active (stream->sync_pad, TRUE); /* provide clock_rate to the jitterbuffer when needed */ g_signal_connect (buffer, "request-pt-map", (GCallback) pt_map_requested, session); @@ -566,17 +896,6 @@ } -/* Manages the RTP streams that come from one client and should therefore be - * synchronized. - */ -struct _GstRtpBinClient -{ - /* the common CNAME for the streams */ - gchar *cname; - /* the streams */ - GSList *streams; -}; - /* GObject vmethods */ static void gst_rtp_bin_finalize (GObject * object); static void gst_rtp_bin_set_property (GObject * object, guint prop_id, @@ -762,6 +1081,7 @@ rtpbin->priv = GST_RTP_BIN_GET_PRIVATE (rtpbin); rtpbin->priv->bin_lock = g_mutex_new (); rtpbin->provided_clock = gst_system_clock_obtain (); + rtpbin->latency = DEFAULT_LATENCY_MS; static void @@ -908,13 +1228,45 @@ +/* emited when caps changed for the session */ +caps_changed (GstPad * pad, GParamSpec * pspec, GstRtpBinSession * session) + GstCaps *caps; + gint payload; + const GstStructure *s; + bin = session->bin; + g_object_get (pad, "caps", &caps, NULL); + if (caps == NULL) + GST_DEBUG_OBJECT (bin, "got caps %" GST_PTR_FORMAT, caps); + s = gst_caps_get_structure (caps, 0); + /* get payload, finish when it's not there */ + if (!gst_structure_get_int (s, "payload", &payload)) + GST_RTP_SESSION_LOCK (session); + GST_DEBUG_OBJECT (bin, "insert caps for payload %d", payload); + g_hash_table_insert (session->ptmap, GINT_TO_POINTER (payload), caps); + GST_RTP_SESSION_UNLOCK (session); /* a new pad (SSRC) was created in @session */ new_ssrc_pad_found (GstElement * element, guint ssrc, GstPad * pad, GstRtpBinSession * session) - GstPad *sinkpad; + GstPad *sinkpad, *srcpad; GST_DEBUG_OBJECT (session->bin, "new SSRC pad %08x", ssrc); @@ -925,12 +1277,38 @@ if (!stream) goto no_stream; + /* get the caps of the pad, we need the clock-rate and base_time if any. */ + if ((caps = gst_pad_get_caps (pad))) { + const GstStructure *s; + guint val; + GST_DEBUG_OBJECT (session->bin, "pad has caps %" GST_PTR_FORMAT, caps); + s = gst_caps_get_structure (caps, 0); + if (!gst_structure_get_int (s, "clock-rate", &stream->clock_rate)) + stream->clock_rate = -1; + if (gst_structure_get_uint (s, "clock-base", &val)) + stream->clock_base = val; + else + stream->clock_base = -1; /* get pad and link */ GST_DEBUG_OBJECT (session->bin, "linking jitterbuffer"); sinkpad = gst_element_get_static_pad (stream->buffer, "sink"); gst_pad_link (pad, sinkpad); gst_object_unref (sinkpad); + /* get the RTCP sync pad */ + GST_DEBUG_OBJECT (session->bin, "linking sync pad"); + padname = g_strdup_printf ("rtcp_src_%d", ssrc); + srcpad = gst_element_get_pad (element, padname); + g_free (padname); + gst_pad_link (srcpad, stream->sync_pad); + gst_object_unref (srcpad); /* connect to the new-pad signal of the payload demuxer, this will expose the * new pad by ghosting it. */ stream->demux_newpad_sig = g_signal_connect (stream->demux, @@ -992,6 +1370,9 @@ if (session->recv_rtp_sink == NULL) goto pad_failed; + g_signal_connect (session->recv_rtp_sink, "notify::caps", + (GCallback) caps_changed, session); GST_DEBUG_OBJECT (rtpbin, "getting RTP src pad"); /* get srcpad, link to SSRCDemux */ session->recv_rtp_src = @@ -999,8 +1380,9 @@ if (session->recv_rtp_src == NULL) - GST_DEBUG_OBJECT (rtpbin, "getting demuxer sink pad"); + GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTP sink pad"); sinkdpad = gst_element_get_static_pad (session->demux, "sink"); + GST_DEBUG_OBJECT (rtpbin, "linking demuxer RTP sink pad"); lres = gst_pad_link (session->recv_rtp_src, sinkdpad); gst_object_unref (sinkdpad); if (lres != GST_PAD_LINK_OK) @@ -1057,11 +1439,8 @@ GstPad *result; guint sessid; -#if 0 GstPad *sinkdpad; GstPadLinkReturn lres; -#endif /* first get the session number */ if (name == NULL || sscanf (name, "recv_rtcp_sink_%d", &sessid) != 1) @@ -1083,29 +1462,25 @@ if (session->recv_rtcp_sink != NULL) goto existed; - GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); /* get recv_rtp pad and store */ + GST_DEBUG_OBJECT (rtpbin, "getting RTCP sink pad"); session->recv_rtcp_sink = gst_element_get_request_pad (session->session, "recv_rtcp_sink"); if (session->recv_rtcp_sink == NULL) GST_DEBUG_OBJECT (rtpbin, "getting sync src pad"); - session->recv_rtcp_src = - gst_element_get_static_pad (session->session, "sync_src"); - if (session->recv_rtcp_src == NULL) + session->sync_src = gst_element_get_static_pad (session->session, "sync_src"); + if (session->sync_src == NULL) - GST_DEBUG_OBJECT (rtpbin, "linking sync to demux"); - sinkdpad = gst_element_get_static_pad (session->demux, "sink"); - lres = gst_pad_link (session->recv_rtcp_src, sinkdpad); + GST_DEBUG_OBJECT (rtpbin, "getting demuxer RTCP sink pad"); + sinkdpad = gst_element_get_static_pad (session->demux, "rtcp_sink"); + lres = gst_pad_link (session->sync_src, sinkdpad); goto link_failed; result = gst_ghost_pad_new_from_template (name, session->recv_rtcp_sink, templ); @@ -1136,13 +1511,11 @@ g_warning ("gstrtpbin: failed to get session pad"); return NULL; link_failed: { g_warning ("gstrtpbin: failed to link pads"); /* Create a pad for sending RTP for the session in @name. Must be called with @@ -1180,6 +1553,9 @@ if (session->send_rtp_sink == NULL) + g_signal_connect (session->send_rtp_sink, "notify::caps", gst_ghost_pad_new_from_template (name, session->send_rtp_sink, templ); gst_pad_set_active (result, TRUE); Index: gstrtpbin.h RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpbin.h,v retrieving revision 1.12 retrieving revision 1.13 diff -u -d -r1.12 -r1.13 --- gstrtpbin.h 23 Aug 2007 21:39:58 -0000 1.12 +++ gstrtpbin.h 3 Sep 2007 21:19:33 -0000 1.13 @@ -48,6 +48,9 @@ /* clock we provide */ GstClock *provided_clock; + /* a list of clients, these are streams with the same CNAME */ + GSList *clients; /*< private >*/ GstRtpBinPrivate *priv; Index: gstrtpjitterbuffer.c RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpjitterbuffer.c,v --- gstrtpjitterbuffer.c 31 Aug 2007 15:26:14 -0000 1.20 +++ gstrtpjitterbuffer.c 3 Sep 2007 21:19:34 -0000 1.21 @@ -99,12 +99,14 @@ #define DEFAULT_LATENCY_MS 200 #define DEFAULT_DROP_ON_LATENCY FALSE +#define DEFAULT_TS_OFFSET 0 enum PROP_0, PROP_LATENCY, - PROP_DROP_ON_LATENCY + PROP_DROP_ON_LATENCY, + PROP_TS_OFFSET #define JBUF_LOCK(priv) (g_mutex_lock ((priv)->jbuf_lock)) @@ -137,6 +139,7 @@ /* properties */ guint latency_ms; gboolean drop_on_latency; /* the last seqnum we pushed out */ guint32 last_popped_seqnum; @@ -150,6 +153,7 @@ gint32 clock_rate; gint64 clock_base; guint64 exttimestamp; /* when we are shutting down */ GstFlowReturn srcresult; @@ -278,6 +282,16 @@ "Tells the jitterbuffer to never exceed the given latency in size", DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE)); /** + * GstRtpJitterBuffer::ts-offset: + * + * Adjust RTP timestamps in the jitterbuffer with offset. + */ + g_object_class_install_property (gobject_class, PROP_TS_OFFSET, + g_param_spec_int64 ("ts-offset", + "Timestamp Offset", + "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64, + G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE)); + /** * GstRtpJitterBuffer::request-pt-map: * @buffer: the object which received the signal * @pt: the pt @@ -421,7 +435,7 @@ GstRtpJitterBufferPrivate *priv; GstStructure *caps_struct; - const GValue *value; + guint val; priv = jitterbuffer->priv; @@ -443,22 +457,22 @@ /* gah, clock-base is uint. If we don't have a base, we will use the first * buffer timestamp as the base time. This will screw up sync but it's better * than nothing. */ - value = gst_structure_get_value (caps_struct, "clock-base"); - if (value && G_VALUE_HOLDS_UINT (value)) { - priv->clock_base = g_value_get_uint (value); - GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, - priv->clock_base); - } else + if (gst_structure_get_uint (caps_struct, "clock-base", &val)) + priv->clock_base = val; + else priv->clock_base = -1; + GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT, + priv->clock_base); /* first expected seqnum */ - value = gst_structure_get_value (caps_struct, "seqnum-base"); - priv->next_seqnum = g_value_get_uint (value); - GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum); + if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) + priv->next_seqnum = val; priv->next_seqnum = -1; + GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum); return TRUE; /* ERRORS */ @@ -929,6 +943,7 @@ GstClockTime timestamp; gint64 running_time; + gint ts_offset_rtp; @@ -996,8 +1011,11 @@ exttimestamp, priv->clock_base); /* if no clock_base was given, take first ts as base */ - if (priv->clock_base == -1) + if (priv->clock_base == -1) { + GST_DEBUG_OBJECT (jitterbuffer, + "no clock base, using exttimestamp %" G_GUINT64_FORMAT, exttimestamp); priv->clock_base = exttimestamp; /* take rtp timestamp offset into account, this can wrap around */ exttimestamp -= priv->clock_base; @@ -1089,6 +1107,34 @@ outbuf = gst_buffer_make_metadata_writable (outbuf); GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + /* apply the timestamp offset */ + if (priv->ts_offset > 0) + ts_offset_rtp = + gst_util_uint64_scale_int (priv->ts_offset, priv->clock_rate, + GST_SECOND); + else if (priv->ts_offset < 0) + -gst_util_uint64_scale_int (-priv->ts_offset, priv->clock_rate, + ts_offset_rtp = 0; + if (ts_offset_rtp != 0) { + guint32 timestamp; + /* if the offset changed, mark with discont */ + if (priv->ts_offset != priv->prev_ts_offset) { + GST_DEBUG_OBJECT (jitterbuffer, "changing offset to %d", ts_offset_rtp); + GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT); + priv->prev_ts_offset = priv->ts_offset; + timestamp = gst_rtp_buffer_get_timestamp (outbuf); + timestamp += ts_offset_rtp; + gst_rtp_buffer_set_timestamp (outbuf, timestamp); /* now we are ready to push the buffer. Save the seqnum and release the lock * so the other end can push stuff in the queue again. */ priv->last_popped_seqnum = seqnum; @@ -1158,6 +1204,7 @@ GstClockTime min_latency, max_latency; gboolean us_live; GstPad *peer; + GstClockTime our_latency; if ((peer = gst_pad_get_peer (priv->sinkpad))) { if ((res = gst_pad_query (peer, query))) { @@ -1172,11 +1219,16 @@ priv->peer_latency = min_latency; JBUF_UNLOCK (priv); - min_latency += priv->latency_ms * GST_MSECOND; + our_latency = ((guint64) priv->latency_ms) * GST_MSECOND; + GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT, + GST_TIME_ARGS (our_latency)); + min_latency += our_latency; /* max_latency can be -1, meaning there is no upper limit for the * latency. */ if (max_latency != -1) - max_latency += priv->latency_ms * GST_MSECOND; + max_latency += our_latency * GST_MSECOND; GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %" GST_TIME_FORMAT " max %" GST_TIME_FORMAT, @@ -1199,7 +1251,11 @@ gst_rtp_jitter_buffer_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec) - GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (object); + GstRtpJitterBuffer *jitterbuffer; + GstRtpJitterBufferPrivate *priv; + jitterbuffer = GST_RTP_JITTER_BUFFER (object); + priv = jitterbuffer->priv; switch (prop_id) { case PROP_LATENCY: @@ -1208,23 +1264,29 @@ /* FIXME, not threadsafe */ new_latency = g_value_get_uint (value); - old_latency = jitterbuffer->priv->latency_ms; + old_latency = priv->latency_ms; - jitterbuffer->priv->latency_ms = new_latency; + priv->latency_ms = new_latency; /* post message if latency changed, this will inform the parent pipeline * that a latency reconfiguration is possible/needed. */ if (new_latency != old_latency) { + GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT, + GST_TIME_ARGS (new_latency * GST_MSECOND)); gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer))); } break; } case PROP_DROP_ON_LATENCY: - { - jitterbuffer->priv->drop_on_latency = g_value_get_boolean (value); + priv->drop_on_latency = g_value_get_boolean (value); + case PROP_TS_OFFSET: + JBUF_LOCK (priv); + priv->ts_offset = g_value_get_int64 (value); + JBUF_UNLOCK (priv); - } default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -1235,14 +1297,23 @@ gst_rtp_jitter_buffer_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) - g_value_set_uint (value, jitterbuffer->priv->latency_ms); + g_value_set_uint (value, priv->latency_ms); - g_value_set_boolean (value, jitterbuffer->priv->drop_on_latency); + g_value_set_boolean (value, priv->drop_on_latency); + g_value_set_int64 (value, priv->ts_offset); Index: gstrtpsession.c RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpsession.c,v retrieving revision 1.18 retrieving revision 1.19 diff -u -d -r1.18 -r1.19 --- gstrtpsession.c 29 Aug 2007 16:56:27 -0000 1.18 +++ gstrtpsession.c 3 Sep 2007 21:19:34 -0000 1.19 @@ -132,6 +132,8 @@ #include "config.h" #include "gstrtpsession.h" #include "rtpsession.h" @@ -214,7 +216,8 @@ - PROP_0 + PROP_0, + PROP_NTP_NS_BASE #define GST_RTP_SESSION_GET_PRIVATE(obj) \ @@ -234,8 +237,10 @@ GThread *thread; /* caps mapping */ - guint8 pt; - gint clock_rate; + GHashTable *ptmap; + /* NTP base time */ + guint64 ntpnsbase; /* callbacks to handle actions from the session manager */ @@ -245,18 +250,18 @@ RTPSource * src, GstBuffer * buffer, gpointer user_data); static GstFlowReturn gst_rtp_session_send_rtcp (RTPSession * sess, +static GstFlowReturn gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data); static gint gst_rtp_session_clock_rate (RTPSession * sess, guint8 payload, gpointer user_data); -static GstClockTime gst_rtp_session_get_time (RTPSession * sess, - gpointer user_data); static void gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data); static RTPSessionCallbacks callbacks = { gst_rtp_session_process_rtp, gst_rtp_session_send_rtp, gst_rtp_session_send_rtcp, + gst_rtp_session_sync_rtcp, gst_rtp_session_clock_rate, - gst_rtp_session_get_time, gst_rtp_session_reconsider @@ -363,6 +368,7 @@ gobject_class->set_property = gst_rtp_session_set_property; gobject_class->get_property = gst_rtp_session_get_property; * GstRtpSession::request-pt-map: * @sess: the object which received the signal @@ -490,6 +496,7 @@ (GCallback) on_bye_timeout, rtpsession); g_signal_connect (rtpsession->priv->session, "on-timeout", (GCallback) on_timeout, rtpsession); + rtpsession->priv->ptmap = g_hash_table_new (NULL, NULL); @@ -513,6 +520,9 @@ rtpsession = GST_RTP_SESSION (object); + case PROP_NTP_NS_BASE: + rtpsession->priv->ntpnsbase = g_value_get_uint64 (value); @@ -528,26 +538,51 @@ + g_value_set_uint64 (value, rtpsession->priv->ntpnsbase); +static guint64 +get_current_ntp_ns_time (GstRtpSession * rtpsession, GstClock * clock) + guint64 ntpnstime; + if (clock) { + /* get current NTP time */ + ntpnstime = gst_clock_get_time (clock); + /* convert to running time */ + ntpnstime -= gst_element_get_base_time (GST_ELEMENT_CAST (rtpsession)); + /* add NTP base offset */ + ntpnstime += rtpsession->priv->ntpnsbase; + } else + ntpnstime = -1; + return ntpnstime; rtcp_thread (GstRtpSession * rtpsession) - GstClock *clock; + GstClock *sysclock, *clock; GstClockID id; GstClockTime current_time; GstClockTime next_timeout; - /* RTCP timeouts we use the system clock */ - clock = gst_system_clock_obtain (); - if (clock == NULL) - goto no_clock; + /* for RTCP timeouts we use the system clock */ + sysclock = gst_system_clock_obtain (); + if (sysclock == NULL) + goto no_sysclock; - current_time = gst_clock_get_time (clock); + /* to get the current NTP time, we use the pipeline clock */ + clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); + current_time = gst_clock_get_time (sysclock); GST_DEBUG_OBJECT (rtpsession, "entering RTCP thread"); @@ -568,7 +603,7 @@ id = rtpsession->priv->id = - gst_clock_new_single_shot_id (clock, next_timeout); + gst_clock_new_single_shot_id (sysclock, next_timeout); GST_RTP_SESSION_UNLOCK (rtpsession); res = gst_clock_id_wait (id, NULL); @@ -581,7 +616,10 @@ /* update current time */ - current_time = gst_clock_get_time (clock); + current_time = gst_clock_get_time (sysclock); + ntpnstime = get_current_ntp_ns_time (rtpsession, clock); /* we get unlocked because we need to perform reconsideration, don't perform * the timeout but get a new reporting estimate. */ @@ -590,18 +628,18 @@ /* perform actions, we ignore result. Release lock because it might push. */ - rtp_session_on_timeout (rtpsession->priv->session, current_time); + rtp_session_on_timeout (rtpsession->priv->session, current_time, ntpnstime); GST_RTP_SESSION_LOCK (rtpsession); GST_RTP_SESSION_UNLOCK (rtpsession); - gst_object_unref (clock); + gst_object_unref (sysclock); GST_DEBUG_OBJECT (rtpsession, "leaving RTCP thread"); return; -no_clock: +no_sysclock: GST_ELEMENT_ERROR (rtpsession, CORE, CLOCK, (NULL), ("Could not get system clock")); @@ -662,7 +700,6 @@ case GST_STATE_CHANGE_NULL_TO_READY: case GST_STATE_CHANGE_READY_TO_PAUSED: - priv->clock_rate = -1; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: @@ -677,17 +714,9 @@ switch (transition) { - GstClockTime base_time; - base_time = GST_ELEMENT_CAST (rtpsession)->base_time; - rtp_session_set_base_time (priv->session, base_time); if (!start_rtcp_thread (rtpsession)) goto failed_thread; case GST_STATE_CHANGE_PLAYING_TO_PAUSED: case GST_STATE_CHANGE_PAUSED_TO_READY: @@ -774,6 +803,15 @@ priv = rtpsession->priv; if (rtpsession->send_rtcp_src) { + GstCaps *caps; + /* set rtcp caps on output pad */ + if (!(caps = GST_PAD_CAPS (rtpsession->send_rtcp_src))) { + caps = gst_caps_new_simple ("application/x-rtcp", NULL); + gst_pad_set_caps (rtpsession->send_rtcp_src, caps); + gst_caps_unref (caps); + gst_buffer_set_caps (buffer, caps); GST_DEBUG_OBJECT (rtpsession, "sending RTCP"); result = gst_pad_push (rtpsession->send_rtcp_src, buffer); } else { @@ -784,30 +822,59 @@ return result; -static gboolean -gst_rtp_session_parse_caps (GstRtpSession * rtpsession, GstCaps * caps) +/* called when the session manager has an SR RTCP packet ready for handling + * inter stream synchronisation */ +gst_rtp_session_sync_rtcp (RTPSession * sess, + RTPSource * src, GstBuffer * buffer, gpointer user_data) + GstFlowReturn result; + GstRtpSession *rtpsession; GstRtpSessionPrivate *priv; - const GstStructure *caps_struct; + rtpsession = GST_RTP_SESSION (user_data); - GST_DEBUG_OBJECT (rtpsession, "parsing caps"); + if (rtpsession->sync_src) { - caps_struct = gst_caps_get_structure (caps, 0); - if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate)) - goto no_clock_rate; + if (!(caps = GST_PAD_CAPS (rtpsession->sync_src))) { + gst_pad_set_caps (rtpsession->sync_src, caps); + GST_DEBUG_OBJECT (rtpsession, "sending Sync RTCP"); + result = gst_pad_push (rtpsession->sync_src, buffer); + GST_DEBUG_OBJECT (rtpsession, "not sending Sync RTCP, no output pad"); + result = GST_FLOW_OK; - GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", priv->clock_rate); +gst_rtp_session_cache_caps (GstRtpSession * rtpsession, GstCaps * caps) + GstRtpSessionPrivate *priv; - return TRUE; + priv = rtpsession->priv; - /* ERRORS */ -no_clock_rate: - { - GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); - return FALSE; - } + GST_DEBUG_OBJECT (rtpsession, "parsing caps"); + caps = g_hash_table_lookup (priv->ptmap, GINT_TO_POINTER (payload)); + if (caps) + g_hash_table_insert (priv->ptmap, GINT_TO_POINTER (payload), caps); /* called when the session manager needs the clock rate */ @@ -821,13 +888,15 @@ GValue ret = { 0 }; GValue args[2] = { {0}, {0} }; GstCaps *caps; rtpsession = GST_RTP_SESSION_CAST (user_data); - /* if we have it, return it */ - if (priv->clock_rate != -1) - return priv->clock_rate; + GST_RTP_SESSION_LOCK (rtpsession); + goto done; g_value_init (&args[0], GST_TYPE_ELEMENT); g_value_set_object (&args[0], rtpsession); @@ -844,10 +913,16 @@ if (!caps) goto no_caps; - if (!gst_rtp_session_parse_caps (rtpsession, caps)) - goto parse_failed; + gst_rtp_session_cache_caps (rtpsession, caps); - result = priv->clock_rate; + if (!gst_structure_get_int (s, "clock-rate", &result)) + GST_DEBUG_OBJECT (rtpsession, "parsed clock-rate %d", result); +done: + GST_RTP_SESSION_UNLOCK (rtpsession); @@ -855,35 +930,15 @@ no_caps: GST_DEBUG_OBJECT (rtpsession, "could not get caps"); - return -1; -parse_failed: - GST_DEBUG_OBJECT (rtpsession, "failed to parse caps"); + GST_DEBUG_OBJECT (rtpsession, "No clock-rate in caps!"); -/* called when the session manager needs the time of clock */ -static GstClockTime -gst_rtp_session_get_time (RTPSession * sess, gpointer user_data) - GstClockTime result; - GstRtpSession *rtpsession; - rtpsession = GST_RTP_SESSION_CAST (user_data); - clock = gst_element_get_clock (GST_ELEMENT_CAST (rtpsession)); - if (clock) { - result = gst_clock_get_time (clock); - gst_object_unref (clock); - result = GST_CLOCK_TIME_NONE; - return result; -} /* called when the session manager asks us to reconsider the timeout */ gst_rtp_session_reconsider (RTPSession * sess, gpointer user_data) @@ -925,18 +980,19 @@ static gboolean gst_rtp_session_sink_setcaps (GstPad * pad, GstCaps * caps) - gboolean res; GstRtpSession *rtpsession; rtpsession = GST_RTP_SESSION (gst_pad_get_parent (pad)); - res = gst_rtp_session_parse_caps (rtpsession, caps); gst_object_unref (rtpsession); - return res; + return TRUE; /* receive a packet from a sender, send it to the RTP session manager and @@ -948,13 +1004,17 @@ GstFlowReturn ret; GST_DEBUG_OBJECT (rtpsession, "received RTP packet"); - ret = rtp_session_process_rtp (priv->session, buffer); + ntpnstime = + get_current_ntp_ns_time (rtpsession, GST_ELEMENT_CLOCK (rtpsession)); + ret = rtp_session_process_rtp (priv->session, buffer, ntpnstime); @@ -1051,8 +1111,6 @@ gst_segment_set_newsegment_full (segment, update, rate, arate, format, start, stop, time); - rtp_session_set_timestamp_sync (priv->session, start); /* push event forward */ ret = gst_pad_push_event (rtpsession->send_rtp_src, event); @@ -1075,13 +1133,24 @@ + GstClockTime timestamp; - ret = rtp_session_send_rtp (priv->session, buffer); + /* get NTP time when this packet was captured, this depends on the timestamp. */ + timestamp = GST_BUFFER_TIMESTAMP (buffer); + if (GST_CLOCK_TIME_IS_VALID (timestamp)) { + /* convert to running time using the segment start value. */ + ntpnstime = timestamp - rtpsession->send_rtp_seg.start; + ntpnstime += priv->ntpnsbase; + ret = rtp_session_send_rtp (priv->session, buffer, ntpnstime); @@ -1113,6 +1182,7 @@ rtpsession->recv_rtp_src = gst_pad_new_from_static_template (&rtpsession_recv_rtp_src_template, "recv_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->recv_rtp_src); gst_pad_set_active (rtpsession->recv_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->recv_rtp_src); @@ -1142,6 +1212,7 @@ rtpsession->sync_src = gst_pad_new_from_static_template (&rtpsession_sync_src_template, "sync_src"); + gst_pad_use_fixed_caps (rtpsession->sync_src); gst_pad_set_active (rtpsession->sync_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->sync_src); @@ -1172,6 +1243,7 @@ rtpsession->send_rtp_src = gst_pad_new_from_static_template (&rtpsession_send_rtp_src_template, "send_rtp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtp_src); gst_pad_set_active (rtpsession->send_rtp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtp_src); @@ -1190,6 +1262,7 @@ rtpsession->send_rtcp_src = gst_pad_new_from_static_template (&rtpsession_send_rtcp_src_template, "send_rtcp_src"); + gst_pad_use_fixed_caps (rtpsession->send_rtcp_src); gst_pad_set_active (rtpsession->send_rtcp_src, TRUE); gst_element_add_pad (GST_ELEMENT_CAST (rtpsession), rtpsession->send_rtcp_src); Index: gstrtpssrcdemux.c RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpssrcdemux.c,v retrieving revision 1.6 retrieving revision 1.7 diff -u -d -r1.6 -r1.7 --- gstrtpssrcdemux.c 23 Aug 2007 21:39:58 -0000 1.6 +++ gstrtpssrcdemux.c 3 Sep 2007 21:19:34 -0000 1.7 @@ -52,6 +52,7 @@ #include <gst/rtp/gstrtpbuffer.h> #include "gstrtpssrcdemux.h" @@ -67,6 +68,13 @@ +static GstStaticPadTemplate rtp_ssrc_demux_rtcp_sink_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_sink", + GST_PAD_ALWAYS, static GstStaticPadTemplate rtp_ssrc_demux_src_template = GST_STATIC_PAD_TEMPLATE ("src_%d", GST_PAD_SRC, @@ -74,6 +82,13 @@ +static GstStaticPadTemplate rtp_ssrc_demux_rtcp_src_template = +GST_STATIC_PAD_TEMPLATE ("rtcp_src_%d", + GST_PAD_SRC, static GstElementDetails gst_rtp_ssrc_demux_details = { "RTP SSRC Demux", "Demux/Network/RTP", @@ -103,6 +118,11 @@ static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf); static gboolean gst_rtp_ssrc_demux_sink_event (GstPad * pad, GstEvent * event); +static GstFlowReturn gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, + GstBuffer * buf); +static gboolean gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, + GstEvent * event); /* srcpad stuff */ static gboolean gst_rtp_ssrc_demux_src_event (GstPad * pad, GstEvent * event); @@ -113,59 +133,78 @@ struct _GstRtpSsrcDemuxPad - GstPad *pad; + GstPad *rtp_pad; + GstPad *rtcp_pad; /* find a src pad for a given SSRC, returns NULL if the SSRC was not found -static GstPad * -find_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +static GstRtpSsrcDemuxPad * +find_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) GSList *walk; - for (walk = demux->rtp_srcpads; walk; walk = g_slist_next (walk)) { + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; if (pad->ssrc == ssrc) - return pad->pad; + return pad; return NULL; -create_rtp_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) +create_demux_pad_for_ssrc (GstRtpSsrcDemux * demux, guint32 ssrc) - GstPad *result; + GstPad *rtp_pad, *rtcp_pad; GstElementClass *klass; GstPadTemplate *templ; gchar *padname; GstRtpSsrcDemuxPad *demuxpad; + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); klass = GST_ELEMENT_GET_CLASS (demux); templ = gst_element_class_get_pad_template (klass, "src_%d"); padname = g_strdup_printf ("src_%d", ssrc); - result = gst_pad_new_from_template (templ, padname); + rtp_pad = gst_pad_new_from_template (templ, padname); + templ = gst_element_class_get_pad_template (klass, "rtcp_src_%d"); + rtcp_pad = gst_pad_new_from_template (templ, padname); g_free (padname); /* wrap in structure and add to list */ demuxpad = g_new0 (GstRtpSsrcDemuxPad, 1); demuxpad->ssrc = ssrc; - demuxpad->pad = result; - demux->rtp_srcpads = g_slist_prepend (demux->rtp_srcpads, demuxpad); + demuxpad->rtp_pad = rtp_pad; + demuxpad->rtcp_pad = rtcp_pad; + demux->srcpads = g_slist_prepend (demux->srcpads, demuxpad); + GST_OBJECT_UNLOCK (demux); /* copy caps from input */ - gst_pad_set_caps (result, GST_PAD_CAPS (demux->rtp_sink)); + gst_pad_set_caps (rtp_pad, GST_PAD_CAPS (demux->rtp_sink)); + gst_pad_use_fixed_caps (rtp_pad); + gst_pad_set_caps (rtcp_pad, GST_PAD_CAPS (demux->rtcp_sink)); + gst_pad_use_fixed_caps (rtcp_pad); - gst_pad_set_event_function (result, gst_rtp_ssrc_demux_src_event); - gst_pad_set_active (result, TRUE); - gst_element_add_pad (GST_ELEMENT_CAST (demux), result); + gst_pad_set_event_function (rtp_pad, gst_rtp_ssrc_demux_src_event); + gst_pad_set_active (rtp_pad, TRUE); + gst_pad_set_active (rtcp_pad, TRUE); + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtp_pad); + gst_element_add_pad (GST_ELEMENT_CAST (demux), rtcp_pad); g_signal_emit (G_OBJECT (demux), - gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, result); + gst_rtp_ssrc_demux_signals[SIGNAL_NEW_SSRC_PAD], 0, ssrc, rtp_pad); + GST_OBJECT_LOCK (demux); + return demuxpad; @@ -176,7 +215,11 @@ gst_element_class_add_pad_template (gstelement_klass, gst_static_pad_template_get (&rtp_ssrc_demux_sink_template)); + gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_sink_template)); + gst_element_class_add_pad_template (gstelement_klass, gst_static_pad_template_get (&rtp_ssrc_demux_src_template)); + gst_static_pad_template_get (&rtp_ssrc_demux_rtcp_src_template)); gst_element_class_set_details (gstelement_klass, &gst_rtp_ssrc_demux_details); @@ -226,6 +269,14 @@ gst_pad_set_chain_function (demux->rtp_sink, gst_rtp_ssrc_demux_chain); gst_pad_set_event_function (demux->rtp_sink, gst_rtp_ssrc_demux_sink_event); gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtp_sink); + demux->rtcp_sink = + gst_pad_new_from_template (gst_element_class_get_pad_template (klass, + "rtcp_sink"), "rtcp_sink"); + gst_pad_set_chain_function (demux->rtcp_sink, gst_rtp_ssrc_demux_rtcp_chain); + gst_pad_set_event_function (demux->rtcp_sink, + gst_rtp_ssrc_demux_rtcp_sink_event); + gst_element_add_pad (GST_ELEMENT_CAST (demux), demux->rtcp_sink); @@ -249,21 +300,63 @@ switch (GST_EVENT_TYPE (event)) { case GST_EVENT_NEWSEGMENT: - res = gst_pad_event_default (pad, event); + { + GSList *walk; + res = TRUE; + GST_OBJECT_LOCK (demux); + for (walk = demux->srcpads; walk; walk = g_slist_next (walk)) { + GstRtpSsrcDemuxPad *pad = (GstRtpSsrcDemuxPad *) walk->data; + gst_event_ref (event); + res &= gst_pad_push_event (pad->rtp_pad, event); + GST_OBJECT_UNLOCK (demux); + gst_event_unref (event); gst_object_unref (demux); return res; +static gboolean +gst_rtp_ssrc_demux_rtcp_sink_event (GstPad * pad, GstEvent * event) + GstRtpSsrcDemux *demux; + gboolean res = FALSE; + demux = GST_RTP_SSRC_DEMUX (gst_pad_get_parent (pad)); + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_NEWSEGMENT: + default: + res &= gst_pad_push_event (pad->rtcp_pad, event); + gst_object_unref (demux); + return res; static GstFlowReturn gst_rtp_ssrc_demux_chain (GstPad * pad, GstBuffer * buf) GstRtpSsrcDemux *demux; - GstPad *srcpad; + GstRtpSsrcDemuxPad *dpad; demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad)); @@ -274,16 +367,16 @@ GST_DEBUG_OBJECT (demux, "received buffer of SSRC %08x", ssrc); - srcpad = find_rtp_pad_for_ssrc (demux, ssrc); - if (srcpad == NULL) { - GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); - srcpad = create_rtp_pad_for_ssrc (demux, ssrc); - if (!srcpad) + dpad = find_demux_pad_for_ssrc (demux, ssrc); + if (dpad == NULL) { + if (!(dpad = create_demux_pad_for_ssrc (demux, ssrc))) goto create_failed; /* push to srcpad */ - ret = gst_pad_push (srcpad, buf); + ret = gst_pad_push (dpad->rtp_pad, buf); return ret; @@ -298,9 +391,74 @@ create_failed: - /* this is not fatal yet */ GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), ("Could not create new pad")); + GST_OBJECT_UNLOCK (demux); + gst_buffer_unref (buf); +gst_rtp_ssrc_demux_rtcp_chain (GstPad * pad, GstBuffer * buf) + GstFlowReturn ret; + demux = GST_RTP_SSRC_DEMUX (GST_OBJECT_PARENT (pad)); + if (!gst_rtcp_buffer_validate (buf)) + if (!gst_rtcp_buffer_get_first_packet (buf, &packet)) + /* first packet must be SR or RR or else the validate would have failed */ + switch (gst_rtcp_packet_get_type (&packet)) { + case GST_RTCP_TYPE_SR: + gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, NULL, NULL, + NULL); + case GST_RTCP_TYPE_RR: + ssrc = gst_rtcp_packet_rr_get_ssrc (&packet); + goto invalid_rtcp; + GST_DEBUG_OBJECT (demux, "received RTCP of SSRC %08x", ssrc); + GST_DEBUG_OBJECT (demux, "creating pad for SSRC %08x", ssrc); + goto create_failed; + /* push to srcpad */ + ret = gst_pad_push (dpad->rtcp_pad, buf); + GST_ELEMENT_ERROR (demux, STREAM, DECODE, (NULL), + ("Dropping invalid RTCP packet")); +create_failed: + ("Could not create new pad")); gst_buffer_unref (buf); return GST_FLOW_ERROR; Index: gstrtpssrcdemux.h RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/gstrtpssrcdemux.h,v --- gstrtpssrcdemux.h 23 Aug 2007 21:39:58 -0000 1.3 +++ gstrtpssrcdemux.h 3 Sep 2007 21:19:34 -0000 1.4 @@ -37,7 +37,8 @@ GstElement parent; GstPad *rtp_sink; - GSList *rtp_srcpads; + GstPad *rtcp_sink; + GSList *srcpads; struct _GstRtpSsrcDemuxClass Index: rtpsession.c RCS file: /cvs/gstreamer/gst-plugins-bad/gst/rtpmanager/rtpsession.c,v retrieving revision 1.14 retrieving revision 1.15 diff -u -d -r1.14 -r1.15 --- rtpsession.c 29 Aug 2007 01:22:42 -0000 1.14 +++ rtpsession.c 3 Sep 2007 21:19:34 -0000 1.15 @@ -23,6 +23,8 @@ #include <gst/rtp/gstrtcpbuffer.h> #include <gst/netbuffer/gstnetbuffer.h> +#include "gstrtpbin-marshal.h" GST_DEBUG_CATEGORY_STATIC (rtp_session_debug); @@ -332,8 +334,8 @@ sess->callbacks.process_rtp = callbacks->process_rtp; sess->callbacks.send_rtp = callbacks->send_rtp; sess->callbacks.send_rtcp = callbacks->send_rtcp; + sess->callbacks.sync_rtcp = callbacks->sync_rtcp; sess->callbacks.clock_rate = callbacks->clock_rate; - sess->callbacks.get_time = callbacks->get_time; sess->callbacks.reconsider = callbacks->reconsider; sess->user_data = user_data; @@ -911,13 +913,14 @@ update_arrival_stats (RTPSession * sess, RTPArrivalStats * arrival, - gboolean rtp, GstBuffer * buffer) + gboolean rtp, GstBuffer * buffer, guint64 ntpnstime) - /* get time or arrival */ - if (sess->callbacks.get_time) - arrival->time = sess->callbacks.get_time (sess, sess->user_data); - else - arrival->time = GST_CLOCK_TIME_NONE; + GTimeVal current; + /* get time of arrival */ + g_get_current_time (¤t); + arrival->time = GST_TIMEVAL_TO_TIME (current); + arrival->ntpnstime = ntpnstime; /* get packet size including header overhead */ arrival->bytes = GST_BUFFER_SIZE (buffer) + sess->header_len; @@ -941,6 +944,7 @@ ... [truncated message content] |