From: <wt...@fr...> - 2004-08-11 15:59:03
|
CVS Root: /cvs/gstreamer Module: gst-plugins Changes by: wtay Date: Wed Aug 11 2004 08:59:00 PDT Log message: * gst/tcp/Makefile.am: * gst/tcp/gstfdset.c: (gst_fdset_mode_get_type), (nearest_pow), (ensure_size), (gst_fdset_new), (gst_fdset_free), (gst_fdset_set_mode), (gst_fdset_get_mode), (gst_fdset_add_fd), (gst_fdset_remove_fd), (gst_fdset_fd_ctl_write), (gst_fdset_fd_ctl_read), (gst_fdset_fd_has_closed), (gst_fdset_fd_has_error), (gst_fdset_fd_can_read), (gst_fdset_fd_can_write), (gst_fdset_wait): * gst/tcp/gstfdset.h: * gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type), (gst_multifdsink_class_init), (gst_multifdsink_init), (gst_multifdsink_add), (gst_multifdsink_remove), (gst_multifdsink_clear), (gst_multifdsink_get_stats), (gst_multifdsink_remove_client_link), (gst_multifdsink_handle_client_read), (gst_multifdsink_client_queue_data), (gst_multifdsink_client_queue_caps), (gst_multifdsink_client_queue_buffer), (gst_multifdsink_handle_client_write), (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), (gst_multifdsink_handle_clients), (gst_multifdsink_set_property), (gst_multifdsink_get_property), (gst_multifdsink_init_send), (gst_multifdsink_close): * gst/tcp/gstmultifdsink.h: * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init), (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read), (gst_tcpserversink_handle_wait), (gst_tcpserversink_init_send), (gst_tcpserversink_close): * gst/tcp/gsttcpserversink.h: Abstracted away the select call, implemented poll (yes we ran into the 1024 limit in production). Modified files: . : ChangeLog gst/tcp : Makefile.am gstmultifdsink.c gstmultifdsink.h gsttcpserversink.c gsttcpserversink.h Added files: gst/tcp : gstfdset.c gstfdset.h Links: http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/ChangeLog.diff?r1=1.986&r2=1.987 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/Makefile.am.diff?r1=1.8&r2=1.9 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/gstfdset.c?rev=1.1&content-type=text/vnd.viewcvs-markup http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/gstfdset.h?rev=1.1&content-type=text/vnd.viewcvs-markup http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/gstmultifdsink.c.diff?r1=1.17&r2=1.18 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/gstmultifdsink.h.diff?r1=1.8&r2=1.9 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/gsttcpserversink.c.diff?r1=1.13&r2=1.14 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/tcp/gsttcpserversink.h.diff?r1=1.7&r2=1.8 ====Begin Diffs==== Index: ChangeLog =================================================================== RCS file: /cvs/gstreamer/gst-plugins/ChangeLog,v retrieving revision 1.986 retrieving revision 1.987 diff -u -d -r1.986 -r1.987 --- ChangeLog 11 Aug 2004 11:42:49 -0000 1.986 +++ ChangeLog 11 Aug 2004 15:58:48 -0000 1.987 @@ -1,3 +1,37 @@ +2004-08-11 Wim Taymans <wi...@fl...> + + * gst/tcp/Makefile.am: + * gst/tcp/gstfdset.c: (gst_fdset_mode_get_type), (nearest_pow), + (ensure_size), (gst_fdset_new), (gst_fdset_free), + (gst_fdset_set_mode), (gst_fdset_get_mode), (gst_fdset_add_fd), + (gst_fdset_remove_fd), (gst_fdset_fd_ctl_write), + (gst_fdset_fd_ctl_read), (gst_fdset_fd_has_closed), + (gst_fdset_fd_has_error), (gst_fdset_fd_can_read), + (gst_fdset_fd_can_write), (gst_fdset_wait): + * gst/tcp/gstfdset.h: + * gst/tcp/gstmultifdsink.c: (gst_unit_type_get_type), + (gst_multifdsink_class_init), (gst_multifdsink_init), + (gst_multifdsink_add), (gst_multifdsink_remove), + (gst_multifdsink_clear), (gst_multifdsink_get_stats), + (gst_multifdsink_remove_client_link), + (gst_multifdsink_handle_client_read), + (gst_multifdsink_client_queue_data), + (gst_multifdsink_client_queue_caps), + (gst_multifdsink_client_queue_buffer), + (gst_multifdsink_handle_client_write), + (gst_multifdsink_recover_client), (gst_multifdsink_queue_buffer), + (gst_multifdsink_handle_clients), (gst_multifdsink_set_property), + (gst_multifdsink_get_property), (gst_multifdsink_init_send), + (gst_multifdsink_close): + * gst/tcp/gstmultifdsink.h: + * gst/tcp/gsttcpserversink.c: (gst_tcpserversink_class_init), + (gst_tcpserversink_init), (gst_tcpserversink_handle_server_read), + (gst_tcpserversink_handle_wait), (gst_tcpserversink_init_send), + (gst_tcpserversink_close): + * gst/tcp/gsttcpserversink.h: + Abstracted away the select call, implemented poll (yes we ran into + the 1024 limit in production). 2004-08-11 Thomas Vander Stichele <thomas at apestaart dot org> * gst/tcp/gsttcp.c: Index: Makefile.am RCS file: /cvs/gstreamer/gst-plugins/gst/tcp/Makefile.am,v retrieving revision 1.8 retrieving revision 1.9 diff -u -d -r1.8 -r1.9 --- Makefile.am 26 Jun 2004 16:49:42 -0000 1.8 +++ Makefile.am 11 Aug 2004 15:58:48 -0000 1.9 @@ -16,6 +16,7 @@ gsttcpplugin.c \ gsttcpsrc.c gsttcpsink.c \ gsttcp.c \ + gstfdset.c \ gstmultifdsink.c \ gsttcpclientsrc.c gsttcpclientsink.c \ gsttcpserversrc.c gsttcpserversink.c @@ -32,6 +33,7 @@ gsttcpplugin.h \ gsttcpsrc.h gsttcpsink.h \ gsttcp.h \ + gstfdset.h \ gstmultifdsink.h \ gsttcpclientsrc.h gsttcpclientsink.h \ gsttcpserversrc.h gsttcpserversink.h --- NEW FILE: gstfdset.c --- /* GStreamer * Copyright (C) <1999> Erik Walthinsen <om...@cs...> * Copyright (C) <2004> Wim Taymans <wi...@fl...> * * gsttcpfdset.h: fdset datastructure * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place - Suite 330, * Boston, MA 02111-1307, USA. */ #define MIN_POLLFDS 64 #define INIT_POLLFDS MIN_POLLFDS #include <sys/poll.h> #include "gstfdset.h" GType gst_fdset_mode_get_type (void) { static GType fdset_mode_type = 0; static GEnumValue fdset_mode[] = { {GST_FDSET_MODE_SELECT, "GST_FDSET_MODE_SELECT", "Select"}, {GST_FDSET_MODE_POLL, "GST_FDSET_MODE_POLL", "Poll"}, {GST_FDSET_MODE_EPOLL, "GST_FDSET_MODE_EPOLL", "EPoll"}, {0, NULL, NULL}, }; if (!fdset_mode_type) { fdset_mode_type = g_enum_register_static ("GstFDSetModeType", fdset_mode); } return fdset_mode_type; } struct _GstFDSet GstFDSetMode mode; /* for poll */ struct pollfd *pollfds; gint last_pollfds; gint size; gint free; /* for select */ fd_set readfds, writefds; /* input */ fd_set testreadfds, testwritefds; /* output */ }; static gint nearest_pow (gint num) gint n = 1; while (n < num) n <<= 1; return n; static void ensure_size (GstFDSet * set, gint len) guint need = len * sizeof (struct pollfd); if (need > set->size) { need = nearest_pow (need); need = MAX (need, MIN_POLLFDS * sizeof (struct pollfd)); set->pollfds = g_realloc (set->pollfds, need); set->size = need; GstFDSet * gst_fdset_new (GstFDSetMode mode) GstFDSet *nset; nset = g_new0 (GstFDSet, 1); nset->mode = mode; switch (mode) { case GST_FDSET_MODE_SELECT: FD_ZERO (&nset->readfds); FD_ZERO (&nset->writefds); break; case GST_FDSET_MODE_POLL: nset->pollfds = NULL; nset->free = 0; nset->last_pollfds = 0; ensure_size (nset, MIN_POLLFDS); case GST_FDSET_MODE_EPOLL: g_warning ("implement me"); default: return nset; void gst_fdset_free (GstFDSet * set) switch (set->mode) { g_free (set->pollfds); g_free (set); gst_fdset_set_mode (GstFDSet * set, GstFDSetMode mode) g_warning ("implement me"); GstFDSetMode gst_fdset_get_mode (GstFDSet * set) return set->mode; gst_fdset_add_fd (GstFDSet * set, GstFD * fd) /* nothing */ { struct pollfd *nfd; gint idx; ensure_size (set, set->last_pollfds + 1); idx = set->free; if (idx == -1) { /* find free space */ while (idx < set->last_pollfds) { idx++; if (set->pollfds[idx].fd == -1) break; } } nfd = &set->pollfds[idx]; nfd->fd = fd->fd; nfd->events = 0; nfd->revents = 0; /* see if we have one fd more */ set->last_pollfds = MAX (idx + 1, set->last_pollfds); fd->idx = idx; set->free = -1; } gst_fdset_remove_fd (GstFDSet * set, GstFD * fd) FD_CLR (fd->fd, &set->writefds); FD_CLR (fd->fd, &set->readfds); set->pollfds[fd->idx].fd = -1; set->pollfds[fd->idx].events = 0; set->pollfds[fd->idx].revents = 0; /* if we removed the last fd, we can lower the last_pollfds */ if (fd->idx + 1 == set->last_pollfds) { set->last_pollfds--; fd->idx = -1; if (set->free == -1) { set->free = fd->idx; } else { set->free = MIN (set->free, fd->idx); gst_fdset_fd_ctl_write (GstFDSet * set, GstFD * fd, gboolean active) if (active) FD_SET (fd->fd, &set->writefds); else FD_CLR (fd->fd, &set->writefds); set->pollfds[fd->idx].events = (active ? POLLOUT : 0); gst_fdset_fd_ctl_read (GstFDSet * set, GstFD * fd, gboolean active) FD_SET (fd->fd, &set->readfds); FD_CLR (fd->fd, &set->readfds); set->pollfds[fd->idx].events = (active ? (POLLIN | POLLPRI) : 0); gboolean gst_fdset_fd_has_closed (GstFDSet * set, GstFD * fd) gboolean res = FALSE; res = FALSE; res = (set->pollfds[fd->idx].revents & POLLHUP) != 0; return res; gst_fdset_fd_has_error (GstFDSet * set, GstFD * fd) res = (set->pollfds[fd->idx].revents & (POLLERR | POLLNVAL)) != 0; gst_fdset_fd_can_read (GstFDSet * set, GstFD * fd) res = FD_ISSET (fd->fd, &set->testreadfds); res = (set->pollfds[fd->idx].revents & (POLLIN | POLLPRI)) != 0; gst_fdset_fd_can_write (GstFDSet * set, GstFD * fd) res = FD_ISSET (fd->fd, &set->testwritefds); res = (set->pollfds[fd->idx].revents & POLLOUT) != 0; int gst_fdset_wait (GstFDSet * set, int timeout) int res = -1; struct timeval tv; struct timeval *tvptr = NULL; set->testreadfds = set->readfds; set->testwritefds = set->writefds; if (timeout > 0) { tv.tv_sec = timeout / 1000; tv.tv_usec = timeout % 1000; tvptr = &tv; res = select (FD_SETSIZE, &set->testreadfds, &set->testwritefds, (fd_set *) 0, tvptr); /* we do not make a copy here. The polfds could change while * executing this call but even if this should happen and cause * problems, we can recover from it */ res = poll (set->pollfds, set->last_pollfds, timeout); --- NEW FILE: gstfdset.h --- * gstfdset.h: fdset datastructure #ifndef __GST_FDSET_H__ #define __GST_FDSET_H__ #include <gst/gst.h> G_BEGIN_DECLS typedef struct _GstFDSet GstFDSet; typedef struct { int fd; gint idx; gpointer data; } GstFD; typedef enum { GST_FDSET_MODE_SELECT, GST_FDSET_MODE_POLL, GST_FDSET_MODE_EPOLL } GstFDSetMode; #define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type()) GType gst_fdset_mode_get_type (void); GstFDSet* gst_fdset_new (GstFDSetMode mode); void gst_fdset_free (GstFDSet *set); void gst_fdset_set_mode (GstFDSet *set, GstFDSetMode mode); GstFDSetMode gst_fdset_get_mode (GstFDSet *set); void gst_fdset_add_fd (GstFDSet *set, GstFD *fd); void gst_fdset_remove_fd (GstFDSet *set, GstFD *fd); void gst_fdset_fd_ctl_write (GstFDSet *set, GstFD *fd, gboolean active); void gst_fdset_fd_ctl_read (GstFDSet *set, GstFD *fd, gboolean active); gboolean gst_fdset_fd_has_closed (GstFDSet *set, GstFD *fd); gboolean gst_fdset_fd_has_error (GstFDSet *set, GstFD *fd); gboolean gst_fdset_fd_can_read (GstFDSet *set, GstFD *fd); gboolean gst_fdset_fd_can_write (GstFDSet *set, GstFD *fd); int gst_fdset_wait (GstFDSet *set, int timeout); G_END_DECLS #endif /* __GST_FDSET_H__ */ Index: gstmultifdsink.c RCS file: /cvs/gstreamer/gst-plugins/gst/tcp/gstmultifdsink.c,v retrieving revision 1.17 retrieving revision 1.18 diff -u -d -r1.17 -r1.18 --- gstmultifdsink.c 10 Aug 2004 15:23:19 -0000 1.17 +++ gstmultifdsink.c 11 Aug 2004 15:58:48 -0000 1.18 @@ -24,6 +24,10 @@ #include <gst/gst-i18n-plugin.h> #include <sys/ioctl.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> #ifdef HAVE_FIONREAD_IN_SYS_FILIO #include <sys/filio.h> @@ -32,6 +36,8 @@ #include "gstmultifdsink.h" #include "gsttcp-marshal.h" +#define NOT_IMPLEMENTED 0 /* the select call is also performed on the control sockets, that way * we can send special commands to unblock or restart the select call */ #define CONTROL_RESTART 'R' /* restart the select call */ @@ -39,15 +45,16 @@ #define CONTROL_SOCKETS(sink) sink->control_sock #define WRITE_SOCKET(sink) sink->control_sock[1] #define READ_SOCKET(sink) sink->control_sock[0] #define SEND_COMMAND(sink, command) \ G_STMT_START { \ unsigned char c; c = command; \ - write (WRITE_SOCKET(sink), &c, 1); \ + write (WRITE_SOCKET(sink).fd, &c, 1); \ } G_STMT_END #define READ_COMMAND(sink, command, res) \ - res = read(READ_SOCKET(sink), &command, 1); \ + res = read(READ_SOCKET(sink).fd, &command, 1); \ /* elementfactory information */ @@ -79,6 +86,7 @@ /* this is really arbitrary choosen */ #define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_TYPE_NONE +#define DEFAULT_MODE GST_FDSET_MODE_POLL #define DEFAULT_BUFFERS_MAX -1 #define DEFAULT_BUFFERS_SOFT_MAX -1 #define DEFAULT_UNIT_TYPE GST_UNIT_TYPE_BUFFERS @@ -91,6 +99,7 @@ { ARG_0, ARG_PROTOCOL, + ARG_MODE, ARG_BUFFERS_QUEUED, ARG_BYTES_QUEUED, ARG_TIME_QUEUED, @@ -132,6 +141,7 @@ return recover_policy_type; } +#if NOT_IMPLEMENTED #define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) static GType gst_unit_type_get_type (void) @@ -149,6 +159,7 @@ } return unit_type_type; +#endif #define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type()) @@ -241,6 +252,10 @@ g_object_class_install_property (gobject_class, ARG_PROTOCOL, g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", GST_TYPE_TCP_PROTOCOL_TYPE, DEFAULT_PROTOCOL, G_PARAM_READWRITE)); + g_object_class_install_property (gobject_class, ARG_MODE, + g_param_spec_enum ("mode", "Mode", + "The mode for selecting activity on the fds", GST_TYPE_FDSET_MODE, + DEFAULT_MODE, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_MAX, g_param_spec_int ("buffers-max", "Buffers max", @@ -251,6 +266,7 @@ "Recover client when going over this limit (-1 = no limit)", -1, G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_UNIT_TYPE, g_param_spec_enum ("unit-type", "Units type", "The unit to measure the max/soft-max/queued properties", @@ -263,11 +279,13 @@ g_param_spec_int ("units-soft-max", "Units soft max", G_MAXINT, DEFAULT_UNITS_SOFT_MAX, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BUFFERS_QUEUED, g_param_spec_uint ("buffers-queued", "Buffers queued", "Number of buffers currently queued", 0, G_MAXUINT, 0, G_PARAM_READABLE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BYTES_QUEUED, g_param_spec_uint ("bytes-queued", "Bytes queued", "Number of bytes currently queued", 0, G_MAXUINT, 0, @@ -276,6 +294,7 @@ g_param_spec_uint64 ("time-queued", "Time queued", "Number of time currently queued", 0, G_MAXUINT64, 0, g_object_class_install_property (gobject_class, ARG_RECOVER_POLICY, g_param_spec_enum ("recover-policy", "Recover Policy", @@ -346,6 +365,7 @@ GST_FLAG_UNSET (this, GST_MULTIFDSINK_OPEN); this->protocol = DEFAULT_PROTOCOL; + this->mode = DEFAULT_MODE; this->clientslock = g_mutex_new (); this->clients = NULL; @@ -359,18 +379,6 @@ this->timeout = DEFAULT_TIMEOUT; -static void -gst_multifdsink_debug_fdset (GstMultiFdSink * sink, fd_set * testfds) -{ - int fd; - - for (fd = 0; fd < FD_SETSIZE; fd++) { - if (FD_ISSET (fd, testfds)) { - GST_LOG_OBJECT (sink, "fd %d", fd); - } - } -} void gst_multifdsink_add (GstMultiFdSink * sink, int fd) @@ -381,7 +389,7 @@ /* create client datastructure */ client = g_new0 (GstTCPClient, 1); - client->fd = fd; + client->fd.fd = fd; client->status = GST_CLIENT_STATUS_OK; client->bufpos = -1; client->bufoffset = 0; @@ -404,7 +412,9 @@ /* set the socket to non blocking */ fcntl (fd, F_SETFL, O_NONBLOCK); /* we always read from a client */ - FD_SET (fd, &sink->readfds); + gst_fdset_add_fd (sink->fdset, &client->fd); + gst_fdset_fd_ctl_read (sink->fdset, &client->fd, TRUE); SEND_COMMAND (sink, CONTROL_RESTART); g_mutex_unlock (sink->clientslock); @@ -428,9 +438,10 @@ client = (GstTCPClient *) clients->data; next = g_list_next (clients); - if (client->fd == fd) { + if (client->fd.fd == fd) { client->status = GST_CLIENT_STATUS_REMOVED; gst_multifdsink_remove_client_link (sink, clients); + SEND_COMMAND (sink, CONTROL_RESTART); break; } @@ -454,6 +465,7 @@ client->status = GST_CLIENT_STATUS_REMOVED; gst_multifdsink_remove_client_link (sink, clients); + SEND_COMMAND (sink, CONTROL_RESTART); @@ -470,7 +482,7 @@ GValue value = { 0 }; guint64 interval; @@ -525,48 +537,42 @@ GTimeVal now; GstTCPClient *client = (GstTCPClient *) link->data; - fd = client->fd; + fd = client->fd.fd; /* FIXME: if we keep track of ip we can log it here and signal */ - GST_DEBUG_OBJECT (sink, "removing client on fd %d", fd); switch (client->status) { case GST_CLIENT_STATUS_OK: GST_WARNING_OBJECT (sink, "removing client %p with fd %d for no reason", - client, client->fd); + client, fd); case GST_CLIENT_STATUS_CLOSED: GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because of close", case GST_CLIENT_STATUS_REMOVED: GST_DEBUG_OBJECT (sink, "removing client %p with fd %d because the app removed it", client, - client->fd); + fd); case GST_CLIENT_STATUS_SLOW: GST_INFO_OBJECT (sink, - "removing client %p with fd %d because it was too slow", client, + "removing client %p with fd %d because it was too slow", client, fd); case GST_CLIENT_STATUS_ERROR: GST_WARNING_OBJECT (sink, - "removing client %p with fd %d because of error", client, client->fd); + "removing client %p with fd %d because of error", client, fd); default: - "removing client %p with fd %d with invalid reason", client, + "removing client %p with fd %d with invalid reason", client, fd); - FD_CLR (fd, &sink->readfds); - FD_CLR (fd, &sink->writefds); + gst_fdset_remove_fd (sink->fdset, &client->fd); if (close (fd) != 0) { /* this is not really an error */ GST_DEBUG_OBJECT (sink, "error closing fd %d: %s", fd, g_strerror (errno)); - SEND_COMMAND (sink, CONTROL_RESTART); g_get_current_time (&now); client->disconnect_time = GST_TIMEVAL_TO_TIME (now); @@ -596,7 +602,7 @@ int avail, fd; gboolean ret; if (ioctl (fd, FIONREAD, &avail) < 0) { GST_WARNING_OBJECT (sink, "ioctl failed for fd %d: %s", @@ -665,7 +671,7 @@ GST_BUFFER_SIZE (buf) = len; GST_LOG_OBJECT (sink, "Queueing data of length %d for fd %d", - len, client->fd); + len, client->fd.fd); client->sending = g_slist_append (client->sending, buf); @@ -683,7 +689,7 @@ string = gst_caps_to_string (caps); GST_DEBUG_OBJECT (sink, "Queueing caps %s for fd %d through GDP", string, - client->fd); + client->fd.fd); g_free (string); if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) { @@ -708,7 +714,7 @@ if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) { - "could not create header, removing client on fd %d", client->fd); + "could not create header, removing client on fd %d", client->fd.fd); return FALSE; gst_multifdsink_client_queue_data (sink, client, header, len); @@ -750,7 +756,7 @@ gst_multifdsink_handle_client_write (GstMultiFdSink * sink, GstTCPClient * client) - int fd = client->fd; + int fd = client->fd.fd; gboolean more; gboolean res; GstClockTime now; @@ -803,7 +809,7 @@ if (client->bufpos == -1) { /* client is too fast, remove from write queue until new buffer is * available */ - FD_CLR (fd, &sink->writefds); + gst_fdset_fd_ctl_write (sink->fdset, &client->fd, FALSE); return TRUE; } else { /* client can pick a buffer from the global queue */ @@ -885,7 +891,7 @@ GST_WARNING_OBJECT (sink, "client %p with fd %d is lagging at %d, recover using policy %d", client, - client->fd, client->bufpos, sink->recover_policy); + client->fd.fd, client->bufpos, sink->recover_policy); switch (sink->recover_policy) { case GST_RECOVER_POLICY_NONE: @@ -961,7 +967,7 @@ client->bufpos++; GST_LOG_OBJECT (sink, "client %p with fd %d at position %d", - client, client->fd, client->bufpos); + client, client->fd.fd, client->bufpos); /* check soft max if needed, recover client */ if (sink->units_soft_max > 0 && client->bufpos >= sink->units_soft_max) { gint newpos; @@ -971,10 +977,11 @@ client->bufpos = newpos; client->discont = TRUE; GST_WARNING_OBJECT (sink, "client %p with fd %d position reset to %d", - client, client->fd, client->bufpos); + client, client->fd.fd, client->bufpos); GST_WARNING_OBJECT (sink, - "client %p with fd %d not recovering position", client, client->fd); + "client %p with fd %d not recovering position", client, + client->fd.fd); } /* check hard max and timeout, remove client */ @@ -983,20 +990,18 @@ && now - client->last_activity_time > sink->timeout)) { /* remove client */ GST_WARNING_OBJECT (sink, "client %p with fd %d is too slow, removing", - FD_CLR (client->fd, &sink->readfds); - FD_CLR (client->fd, &sink->writefds); + client, client->fd.fd); + /* remove the client, the fd set will be cleared and the select thread will + * be signaled */ client->status = GST_CLIENT_STATUS_SLOW; - /* cannot send data to this client anymore. need to signal the select thread that - * the fd_set changed */ - need_signal = TRUE; /* set client to invalid position while being removed */ client->bufpos = -1; + need_signal = TRUE; } else if (client->bufpos == 0) { /* can send data to this client now. need to signal the select thread that * the fd_set changed */ - FD_SET (client->fd, &sink->writefds); + gst_fdset_fd_ctl_write (sink->fdset, &client->fd, TRUE); need_signal = TRUE; /* keep track of maximum buffer usage */ @@ -1038,7 +1043,6 @@ gst_multifdsink_handle_clients (GstMultiFdSink * sink) int result; - fd_set testreadfds, testwritefds; GList *clients, *next; gboolean try_again; GstMultiFdSinkClass *fclass; @@ -1054,20 +1058,11 @@ * - server socket input (ie, new client connections) * - client socket input (ie, clients saying goodbye) * - client socket output (ie, client reads) */ - testwritefds = sink->writefds; - testreadfds = sink->readfds; - GST_LOG_OBJECT (sink, "doing select on server + client fds for reads"); - gst_multifdsink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "doing select on client fds for writes"); - gst_multifdsink_debug_fdset (sink, &testwritefds); - result = - select (FD_SETSIZE, &testreadfds, &testwritefds, (fd_set *) 0, NULL); + result = gst_fdset_wait (sink->fdset, -1); /* < 0 is an error, 0 just means a timeout happened */ if (result < 0) { - GST_WARNING_OBJECT (sink, "select failed: %s", g_strerror (errno)); + GST_WARNING_OBJECT (sink, "wait failed: %s", g_strerror (errno)); if (errno == EBADF) { /* ok, so one of the fds is invalid. We loop over them to find one * that gives an error to the F_GETFL fcntl. */ @@ -1081,7 +1076,7 @@ client = (GstTCPClient *) clients->data; next = g_list_next (clients); - fd = client->fd; + fd = client->fd.fd; res = fcntl (fd, F_GETFL, &flags); if (res == -1) { @@ -1106,34 +1101,35 @@ return; } else { - GST_LOG_OBJECT (sink, "%d sockets had action", result); - GST_LOG_OBJECT (sink, "done select on server/client fds for reads"); - gst_multifdsink_debug_fdset (sink, &testreadfds); - GST_LOG_OBJECT (sink, "done select on client fds for writes"); - gst_multifdsink_debug_fdset (sink, &testwritefds); + GST_INFO_OBJECT (sink, "wait done: %d", result); /* read all commands */ - if (FD_ISSET (READ_SOCKET (sink), &testreadfds)) { + if (gst_fdset_fd_can_read (sink->fdset, &READ_SOCKET (sink))) { + GST_INFO_OBJECT (sink, "have a command"); while (TRUE) { gchar command; int res; READ_COMMAND (sink, command, res); if (res < 0) { + GST_INFO_OBJECT (sink, "no more commands"); /* no more commands */ break; } switch (command) { case CONTROL_RESTART: + GST_INFO_OBJECT (sink, "restart"); /* need to restart the select call as the fd_set changed */ try_again = TRUE; break; + /* need to restart the select call as the fd_set changed */ case CONTROL_STOP: + GST_INFO_OBJECT (sink, "stop"); /* stop this function */ stop = TRUE; default: + GST_WARNING_OBJECT (sink, "unkown"); g_warning ("multifdsink: unknown control message received"); @@ -1145,14 +1141,13 @@ } while (try_again); - if (fclass->select) - fclass->select (sink, &testreadfds, &testwritefds); + if (fclass->wait) + fclass->wait (sink, sink->fdset); /* Check the reads */ g_mutex_lock (sink->clientslock); for (clients = sink->clients; clients; clients = next) { GstTCPClient *client; - int fd; @@ -1162,16 +1157,24 @@ continue; - fd = client->fd; - if (FD_ISSET (fd, &testreadfds)) { + if (gst_fdset_fd_has_closed (sink->fdset, &client->fd)) { + client->status = GST_CLIENT_STATUS_CLOSED; + gst_multifdsink_remove_client_link (sink, clients); + continue; + } + if (gst_fdset_fd_has_error (sink->fdset, &client->fd)) { + client->status = GST_CLIENT_STATUS_ERROR; + if (gst_fdset_fd_can_read (sink->fdset, &client->fd)) { /* handle client read */ if (!gst_multifdsink_handle_client_read (sink, client)) { gst_multifdsink_remove_client_link (sink, clients); continue; - if (FD_ISSET (fd, &testwritefds)) { + if (gst_fdset_fd_can_write (sink->fdset, &client->fd)) { /* handle client write */ if (!gst_multifdsink_handle_client_write (sink, client)) { @@ -1243,6 +1246,9 @@ case ARG_PROTOCOL: multifdsink->protocol = g_value_get_enum (value); + case ARG_MODE: + multifdsink->mode = g_value_get_enum (value); + break; case ARG_BUFFERS_MAX: multifdsink->units_max = g_value_get_int (value); @@ -1284,6 +1290,9 @@ g_value_set_enum (value, multifdsink->protocol); + g_value_set_enum (value, multifdsink->mode); g_value_set_int (value, multifdsink->units_max); @@ -1333,20 +1342,26 @@ gst_multifdsink_init_send (GstMultiFdSink * this) + int control_socket[2]; fclass = GST_MULTIFDSINK_GET_CLASS (this); - FD_ZERO (&this->readfds); - FD_ZERO (&this->writefds); + GST_INFO_OBJECT (this, "starting in mode %d", this->mode); + this->fdset = gst_fdset_new (this->mode); - if (socketpair (PF_UNIX, SOCK_STREAM, 0, CONTROL_SOCKETS (this)) < 0) { + if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_socket) < 0) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL), GST_ERROR_SYSTEM); return FALSE; - FD_SET (READ_SOCKET (this), &this->readfds); - fcntl (READ_SOCKET (this), F_SETFL, O_NONBLOCK); - fcntl (WRITE_SOCKET (this), F_SETFL, O_NONBLOCK); + READ_SOCKET (this).fd = control_socket[0]; + WRITE_SOCKET (this).fd = control_socket[1]; + gst_fdset_add_fd (this->fdset, &READ_SOCKET (this)); + gst_fdset_fd_ctl_read (this->fdset, &READ_SOCKET (this), TRUE); + fcntl (READ_SOCKET (this).fd, F_SETFL, O_NONBLOCK); + fcntl (WRITE_SOCKET (this).fd, F_SETFL, O_NONBLOCK); this->streamheader = NULL; this->bytes_to_serve = 0; @@ -1375,8 +1390,9 @@ SEND_COMMAND (this, CONTROL_STOP); g_thread_join (this->thread); - close (READ_SOCKET (this)); - close (WRITE_SOCKET (this)); + close (READ_SOCKET (this).fd); + close (WRITE_SOCKET (this).fd); + gst_fdset_remove_fd (this->fdset, &READ_SOCKET (this)); if (this->streamheader) { GSList *l; @@ -1389,6 +1405,8 @@ if (fclass->close) fclass->close (this); + gst_fdset_free (this->fdset); static GstElementStateReturn Index: gstmultifdsink.h RCS file: /cvs/gstreamer/gst-plugins/gst/tcp/gstmultifdsink.h,v --- gstmultifdsink.h 10 Aug 2004 15:23:19 -0000 1.8 +++ gstmultifdsink.h 11 Aug 2004 15:58:48 -0000 1.9 @@ -29,20 +29,8 @@ extern "C" { #endif /* __cplusplus */ -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/time.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <netdb.h> -#include <sys/socket.h> -#include <sys/wait.h> -#include <fcntl.h> -#include <arpa/inet.h> #include "gsttcp.h" +#include "gstfdset.h" #define GST_TYPE_MULTIFDSINK \ (gst_multifdsink_get_type()) @@ -94,7 +82,8 @@ /* structure for a client * */ typedef struct { + GstFD fd; gint bufpos; /* position of this client in the global queue */ GstClientStatus status; @@ -131,10 +120,13 @@ GMutex *clientslock; /* lock to protect the clients list */ GList *clients; /* list of clients we are serving */ - fd_set readfds; /* all the client file descriptors that we can read from */ - fd_set writefds; /* all the client file descriptors that we can write to */ + GstFDSetMode mode; + GstFDSet *fdset; - int control_sock[2]; /* sockets for controlling the select call */ + //fd_set readfds; /* all the client file descriptors that we can read from */ + //fd_set writefds; /* all the client file descriptors that we can write to */ + GstFD control_sock[2];/* sockets for controlling the select call */ GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ GstTCPProtocolType protocol; @@ -168,7 +160,7 @@ /* vtable */ gboolean (*init) (GstMultiFdSink *sink); - gboolean (*select) (GstMultiFdSink *sink, fd_set *readfds, fd_set *writefds); + gboolean (*wait) (GstMultiFdSink *sink, GstFDSet *set); gboolean (*close) (GstMultiFdSink *sink); /* signals */ Index: gsttcpserversink.c RCS file: /cvs/gstreamer/gst-plugins/gst/tcp/gsttcpserversink.c,v retrieving revision 1.13 retrieving revision 1.14 diff -u -d -r1.13 -r1.14 --- gsttcpserversink.c 27 Jun 2004 11:15:23 -0000 1.13 +++ gsttcpserversink.c 11 Aug 2004 15:58:48 -0000 1.14 @@ -57,8 +57,8 @@ static void gst_tcpserversink_class_init (GstTCPServerSink * klass); static void gst_tcpserversink_init (GstTCPServerSink * tcpserversink); -static gboolean gst_tcpserversink_handle_select (GstMultiFdSink * sink, - fd_set * readfds, fd_set * writefds); +static gboolean gst_tcpserversink_handle_wait (GstMultiFdSink * sink, + GstFDSet * set); static gboolean gst_tcpserversink_init_send (GstMultiFdSink * this); static gboolean gst_tcpserversink_close (GstMultiFdSink * this); @@ -129,7 +129,7 @@ gobject_class->get_property = gst_tcpserversink_get_property; gstmultifdsink_class->init = gst_tcpserversink_init_send; - gstmultifdsink_class->select = gst_tcpserversink_handle_select; + gstmultifdsink_class->wait = gst_tcpserversink_handle_wait; gstmultifdsink_class->close = gst_tcpserversink_close; GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink"); @@ -142,7 +142,7 @@ /* should support as minimum 576 for IPV4 and 1500 for IPV6 */ /* this->mtu = 1500; */ - this->server_sock_fd = -1; + this->server_sock.fd = -1; /* handle a read request on the server, @@ -156,7 +156,7 @@ int client_address_len; client_sock_fd = - accept (sink->server_sock_fd, (struct sockaddr *) &client_address, + accept (sink->server_sock.fd, (struct sockaddr *) &client_address, &client_address_len); if (client_sock_fd == -1) { GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL), @@ -173,12 +173,11 @@ static gboolean -gst_tcpserversink_handle_select (GstMultiFdSink * sink, - fd_set * readfds, fd_set * writefds) +gst_tcpserversink_handle_wait (GstMultiFdSink * sink, GstFDSet * set) GstTCPServerSink *this = GST_TCPSERVERSINK (sink); - if (FD_ISSET (this->server_sock_fd, readfds)) { + if (gst_fdset_fd_can_read (set, &this->server_sock)) { /* handle new client connection on server socket */ if (!gst_tcpserversink_handle_server_read (this)) { GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), @@ -245,22 +244,22 @@ GstTCPServerSink *this = GST_TCPSERVERSINK (parent); /* create sending server socket */ - if ((this->server_sock_fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { + if ((this->server_sock.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM); GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d", - this->server_sock_fd); + this->server_sock.fd); /* make address reusable */ - if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_REUSEADDR, &ret, + if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof (int)) < 0) { GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), ("Could not setsockopt: %s", g_strerror (errno))); /* keep connection alive; avoids SIGPIPE during write */ - if (setsockopt (this->server_sock_fd, SOL_SOCKET, SO_KEEPALIVE, &ret, + if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, &ret, @@ -275,7 +274,7 @@ /* bind it */ GST_DEBUG_OBJECT (this, "binding server socket to address"); - ret = bind (this->server_sock_fd, (struct sockaddr *) &this->server_sin, + ret = bind (this->server_sock.fd, (struct sockaddr *) &this->server_sin, sizeof (this->server_sin)); if (ret) { @@ -289,20 +288,23 @@ /* set the server socket to nonblocking */ - fcntl (this->server_sock_fd, F_SETFL, O_NONBLOCK); + fcntl (this->server_sock.fd, F_SETFL, O_NONBLOCK); GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", - this->server_sock_fd, TCP_BACKLOG); - if (listen (this->server_sock_fd, TCP_BACKLOG) == -1) { + this->server_sock.fd, TCP_BACKLOG); + if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) { GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), ("Could not listen on server socket: %s", g_strerror (errno))); GST_DEBUG_OBJECT (this, "listened on server socket %d, returning from connection setup", - FD_SET (this->server_sock_fd, &parent->readfds); + gst_fdset_add_fd (parent->fdset, &this->server_sock); + gst_fdset_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); + //FD_SET (this->server_sock_fd, &parent->readfds); return TRUE; @@ -312,9 +314,11 @@ - if (this->server_sock_fd != -1) { - close (this->server_sock_fd); - this->server_sock_fd = -1; + if (this->server_sock.fd != -1) { + close (this->server_sock.fd); + this->server_sock.fd = -1; + gst_fdset_remove_fd (parent->fdset, &this->server_sock); Index: gsttcpserversink.h RCS file: /cvs/gstreamer/gst-plugins/gst/tcp/gsttcpserversink.h,v retrieving revision 1.7 diff -u -d -r1.7 -r1.8 --- gsttcpserversink.h 26 Jun 2004 16:49:42 -0000 1.7 +++ gsttcpserversink.h 11 Aug 2004 15:58:48 -0000 1.8 @@ -73,7 +73,7 @@ struct sockaddr_in server_sin; /* socket */ - int server_sock_fd; + GstFD server_sock; }; struct _GstTCPServerSinkClass { |