From: Ramon G. F. <ra...@pd...> - 2004-02-25 23:34:27
|
CVS Root: /cvs/gstreamer Module: gst-plugins Changes by: ramon Date: Wed Feb 25 2004 15:32:53 PST Branch: new-udpsrc Log message: New implementation of udpsrc with many new features. Modified files: gst/udp : gstudpsrc.c gstudpsrc.h Added files: gst/udp : address.c address.h Links: http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/udp/address.c?rev=1.1.2.1&content-type=text/vnd.viewcvs-markup http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/udp/address.h?rev=1.1.2.1&content-type=text/vnd.viewcvs-markup http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/udp/gstudpsrc.c.diff?r1=1.22&r2=1.22.4.1 http://freedesktop.org/cgi-bin/viewcvs.cgi/gstreamer/gst-plugins/gst/udp/gstudpsrc.h.diff?r1=1.7&r2=1.7.8.1 ====Begin Diffs==== --- NEW FILE: address.c --- #include "address.h" #include <netdb.h> static gpointer gst_address_copy(gpointer); static void gst_address_free(gpointer); static gpointer gst_address_list_copy(gpointer); static void gst_address_list_free(gpointer); static gpointer gst_addr_port_copy(gpointer); static void gst_addr_port_free(gpointer); static gpointer gst_addr_port_list_copy(gpointer); static void gst_addr_port_list_free(gpointer); static void resolve_name(const gchar* name, GstNumAddress* result); GType gst_address_get_type(void) { static GType address_type = 0; if (address_type == 0) { address_type = g_boxed_type_register_static("GstAddress", gst_address_copy, gst_address_free); } return address_type; } static gpointer gst_address_copy(gpointer src) { GstAddress* addr_src = (GstAddress*) src; GstAddress* addr_result; addr_result = g_new0(GstAddress, 1); *addr_result = *addr_src; if (addr_src->type == NAME) { addr_result->addr.name = g_strdup(addr_src->addr.name); return (gpointer) addr_result; static void gst_address_free(gpointer ptr) GstAddress* addr_ptr = (GstAddress*) ptr; if (addr_ptr->type == NAME) { g_free(addr_ptr->addr.name); g_free(ptr); gst_address_list_get_type(void) { static GType type = 0; if (type == 0) { type = g_boxed_type_register_static("GstAddressList", gst_address_list_copy, gst_address_list_free); return type; gst_address_list_copy(gpointer src) GList* src_list = (GList*) src; GList* dest_list = NULL; GList* list_node = src_list; while (list_node != NULL) { dest_list = g_list_append(dest_list, gst_address_copy(list_node->data)); list_node = g_list_next(list_node); return (gpointer) dest_list; gst_address_list_free(gpointer ptr) GList* ptr_list = (GList*) ptr; GList* list_node = ptr_list; gst_address_free(list_node->data); list_node = g_list_remove_link(list_node, list_node); static gpointer gst_addr_port_copy(gpointer src) GstAddrPort* addr_src = (GstAddrPort*) src; GstAddrPort* addr_result; addr_result = g_new0(GstAddrPort, 1); if (addr_src->addr.type == NAME) { addr_result->addr.addr.name = g_strdup(addr_src->addr.addr.name); static void gst_addr_port_free(gpointer ptr) GstAddrPort* addr_ptr = (GstAddrPort*) ptr; if (addr_ptr->addr.type == NAME) { g_free(addr_ptr->addr.addr.name); gst_addr_port_get_type(void) type = g_boxed_type_register_static("GstAddrPort", gst_addr_port_copy, gst_addr_port_free); GType gst_addr_port_list_get_type(void) type = g_boxed_type_register_static("GstAddrPortList", gst_addr_port_list_copy, gst_addr_port_list_free); gst_addr_port_list_copy(gpointer src) dest_list = g_list_append(dest_list, gst_addr_port_copy(list_node->data)); gst_addr_port_list_free(gpointer ptr) gst_addr_port_free(list_node->data); void resolve_address(const GstAddress* address, GstNumAddress* result) if (result->alternatives != NULL) { g_free(result->alternatives); result->num_alternatives = 0 result->alternatives = NULL; switch (address->type) { case IPV4: result->num_alternatives = 1; result->alternatives = g_new(struct in6_addr, 1); map_ipv4_to_ipv6(address->addr.ipv4_address, result->alternatives[0]); break; case IPV6: result->alternatives[0] = address->addr.ipv6_address; case NAME: resolve_name(address->addr.name, result); resolve_name(const gchar* name, GstNumAddress* result) int error; struct addrinfo* result; struct addrinfo* link; error = getaddrinfo(name, NULL, NULL, &result); if (error != 0) return; result->num_alternatives = 0; link = result; while(link != NULL) { if (link->ai_family == AF_INET || link->ai_family == AF_INET6) { result->num_alternatives++; } link = link->ai_next; result->alternatives = g_new0(struct in6_addr, result->num_alternatives); alternative = result->alternatives; if (link->ai_protocol == AF_INET || link->ai_family == AF_INET6) { *alternative = ((struct sockaddr_in6 *) link->ai_addr)->sin6_addr; alternative++; freeaddrinfo(result); gboolean try_bind(int socket, const GstNumAddress* address, int port) int i; struct in6_addr addr; g_assert(address->num_alternatives > 0); for (i = 0; i < address->num_alternatives; i++) { addr = address->alternatives[i]; addr.sin_port = port; error = bind(socket, &addr, sizeof(struct in6_addr)); if (error == 0) { break; return (error != 0); void try_send(int socket, GstNumAddressPort* remote, const char* data, guint size) struct sockaddr_in6 peer; guint i; gboolean sucess; peer.sin6_port = remote->port; peer.sin6_flowinfo = 0; peer.sin6_scope_id = 0; for (i = 0; i < remote->address.num_alternatives; i++) { peer.sin6_addr = remote->address.alternatives[i]; error = sendto(socket, data, size, 0, &peer, sizeof(sockaddr_in6)); sucess = error >= 0; if (sucess) --- NEW FILE: address.h --- #ifndef ADDRESS_H #define ADDRESS_H #include <glib-object.h> #include <netinet/in.h> struct _GstAddress { enum {IPV4, IPV6, NAME} type; union addr_fmt { struct in_addr ipv4_address; struct in6_addr ipv6_address; char* name; } addr; }; typedef struct _GstAddress GstAddress; struct _GstAddrPort { GstAddress addr; unsigned short port; typedef struct _GstAddrPort GstAddrPort; GType gst_address_get_type(); GType gst_peer_get_type(); GType gst_address_list_get_type(); GType gst_addr_port_list_get_type(); #define GST_TYPE_ADDRESS (gst_address_get_type()) #define GST_TYPE_ADDR_PORT (gst_addr_port_get_type()) #define GST_TYPE_ADDRESS_LIST (gst_address_list_get_type()) #define GST_TYPE_ADDR_PORT_LIST (gst_addr_port_list_get_type()) /* Represents the address of a single logical machine, that can be accesses throught any of these addresses. It is the result of a name resolution that returns a list of addresses */ struct _GstNumAddress { guint num_alternatives; struct in6_addr* alternatives; typedef struct _GstNumAddress GstNumAddress; /* Represents a list of addresses. For instance, a list of authorized machines */ struct _GstNumAddresses { GList* list; /* Actually, a GList of <GstNumAddress> */ typedef struct _GstNumAddresses GstNumAddresses; struct _GstNumAddressPort { GstNumAddress address; guint16 port; typedef struct _GstNumAddressPort GstNumAddressPort; struct _GstNumAddressesPorts { GList* list; /* GList of <GstNumAddressPort> */ typedef struct _GstNumAddressesPorts GstNumAddressesPorts; void resolve_address(const GstAddress* address, GstNumAddress* result); void resolve_addresses(const GList* addresses, /* GList of GstAddress */ GstNumAddresses* result); void resolve_addresses_ports(const GList* addresses, /* GList of GstAddrPort */ GstNumAddressesPorts* result); gboolean try_bind(int socket, const GstNumAddress* address, int port); void try_send(int socket, GstNumAddressPort* remote, const char* data, guint size); #endif Index: gstudpsrc.c =================================================================== RCS file: /cvs/gstreamer/gst-plugins/gst/udp/gstudpsrc.c,v retrieving revision 1.22 retrieving revision 1.22.4.1 diff -u -d -r1.22 -r1.22.4.1 --- a/gstudpsrc.c 2 Feb 2004 20:09:09 -0000 1.22 +++ b/gstudpsrc.c 25 Feb 2004 23:32:40 -0000 1.22.4.1 @@ -1,5 +1,7 @@ /* GStreamer - * Copyright (C) <1999> Erik Walthinsen <om...@cs...> + * UDPSRC element. Retreives data from the network by UDP protocol + * Copyright (C) 2004 Ramón García Fernández + * Copyright (C) 1999 Eric Walthinsen * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -24,17 +26,20 @@ #include "gstudpsrc.h" #include <unistd.h> +#include <sys/poll.h> +#include <sys/ioctl.h> -#define UDP_DEFAULT_PORT 4951 -#define UDP_DEFAULT_MULTICAST_GROUP "0.0.0.0" /* elementfactory information */ -static GstElementDetails gst_udpsrc_details = GST_ELEMENT_DETAILS ( +static GstElementDetails gst_udpsrc_details = { "UDP packet receiver", "Source/Network", + "LGPL", "Receive data over the network via UDP", - "Wim Taymans <wim...@ch...>" -); + 0, + "Ramon Garcia <ram...@ya...>, Wim Taymans <wim...@ch...>" + "(c) 2000-2004", +}; /* UDPSrc signals and args */ enum { @@ -42,37 +47,16 @@ LAST_SIGNAL }; -enum { - ARG_0, - ARG_PORT, - ARG_CONTROL, - ARG_MULTICAST_GROUP - /* FILL ME */ -}; - -#define GST_TYPE_UDPSRC_CONTROL (gst_udpsrc_control_get_type()) -static GType -gst_udpsrc_control_get_type(void) { - static GType udpsrc_control_type = 0; - static GEnumValue udpsrc_control[] = { - {CONTROL_NONE, "1", "none"}, - {CONTROL_UDP, "2", "udp"}, - {CONTROL_TCP, "3", "tcp"}, - {CONTROL_ZERO, NULL, NULL}, - }; - if (!udpsrc_control_type) { - udpsrc_control_type = g_enum_register_static("GstUDPSrcControl", udpsrc_control); - } - return udpsrc_control_type; -} static void gst_udpsrc_base_init (gpointer g_class); static void gst_udpsrc_class_init (GstUDPSrc *klass); static void gst_udpsrc_init (GstUDPSrc *udpsrc); -static GstData* gst_udpsrc_get (GstPad *pad); -static GstElementStateReturn - gst_udpsrc_change_state (GstElement *element); +static GstData* gst_udpsrc_get (GstPad *pad); + +static gboolean gst_udpsrc_event_handler (GstPad* element, GstEvent* event); +static GstElementStateReturn gst_udpsrc_change_state(GstElement *element); static void gst_udpsrc_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); @@ -80,6 +64,29 @@ GValue *value, GParamSpec *pspec); static void gst_udpsrc_set_clock (GstElement *element, GstClock *clock); +static gboolean gst_udpsrc_release_locks (GstElement* element); +typedef enum {control, data, interrupt} source_type; +static void wait_for_data (const GstUDPSrc* udpsrc, guint* data_size, char** data, source_type* source_type, struct sockaddr_in6* peer); +static gboolean check_authorized_peer (const GstUDPSrc* udpsrc, const struct sockaddr_in6* peer); +static GstBuffer* +data_buffer(const GstUDPSrc* udpsrc, guint data_size, const char* data_received, struct sockaddr_in6* addr); +static GstEvent* +control_event(const GstUDPSrc* udpsrc, guint data_size, const char* data_received, const struct sockaddr_in6* addr); +static void update_raw_addresses(GstUDPSrc* element); +static void put_data_addr(const GstUDPSrc* udpsrc, char** result_data, guint* result_size, guint data_size, char* data); +static void +send_control_data(guint size, gpointer data); static GstElementClass *parent_class = NULL; /*static guint gst_udpsrc_signals[LAST_SIGNAL] = { 0 }; */ @@ -125,22 +132,30 @@ parent_class = g_type_class_ref (GST_TYPE_ELEMENT); - g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_PORT, - g_param_spec_int ("port", "port", "The port to receive the packets from", - 0, 32768, UDP_DEFAULT_PORT, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_CONTROL, - g_param_spec_enum ("control", "control", "The type of control", - GST_TYPE_UDPSRC_CONTROL, CONTROL_UDP, G_PARAM_READWRITE)); - g_object_class_install_property (gobject_class, ARG_MULTICAST_GROUP, - g_param_spec_string ("multicast_group", "multicast_group", - "The Address of multicast group to join", - UDP_DEFAULT_MULTICAST_GROUP, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LOCAL_ADDR, + g_param_spec_boxed ("local-addr", "local-addr", + "Address that this element will use for listenting", + GST_TYPE_ADDRESS, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_DATA_PORT, + g_param_spec_int ("data-port", "data-port", "The port to receive media content", + 0, 65535, -1, G_PARAM_READWRITE) ); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_CONTROL_PORT, + g_param_spec_int ("control-port", "control-port", + "The port to send and receive control messages", + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_AUTHORIZED_SENDERS, + g_param_spec_boxed ("authorized-senders", "authorized-senders", + "List of addresses of hosts, such that packets from these hosts are not rejected", + GST_TYPE_ADDRESS_LIST, G_PARAM_READWRITE) ); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_REMOTE_ADDRESSES, + g_param_spec_boxed ("remote-receivers", "remote-receivers", + "List of addresses of hosts that we will send control messages to", + GST_TYPE_ADDR_PORT_LIST, G_PARAM_READWRITE) ); gobject_class->set_property = gst_udpsrc_set_property; gobject_class->get_property = gst_udpsrc_get_property; gstelement_class->change_state = gst_udpsrc_change_state; gstelement_class->set_clock = gst_udpsrc_set_clock; + gstelement_class->release_locks = gst_udpsrc_release_locks; } static void @@ -158,137 +173,76 @@ { /* create the src and src pads */ udpsrc->srcpad = gst_pad_new ("src", GST_PAD_SRC); + GstRealPad* realsrcpad = (GstRealPad*) udpsrc->srcpad; + realsrcpad->eventfunc = gst_udpsrc_event_handler; gst_element_add_pad (GST_ELEMENT (udpsrc), udpsrc->srcpad); gst_pad_set_get_function (udpsrc->srcpad, gst_udpsrc_get); - udpsrc->port = UDP_DEFAULT_PORT; - udpsrc->control = CONTROL_UDP; + udpsrc->local_address = (GstAddress) + {.type = IPV4, .addr.ipv4_address.s_addr = 0}; + udpsrc->data_port = -1; + udpsrc->control_port = -1; + udpsrc->remote_addresses = NULL; + udpsrc->authorized_senders = NULL; udpsrc->clock = NULL; - udpsrc->sock = -1; - udpsrc->control_sock = -1; - udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); + udpsrc->data_socket = -1; + udpsrc->control_socket = -1; udpsrc->first_buf = TRUE; static GstData* -gst_udpsrc_get (GstPad *pad) +gst_udpsrc_get(GstPad *pad) - GstUDPSrc *udpsrc; - GstBuffer *outbuf; - struct sockaddr_in tmpaddr; - socklen_t len; - gint numbytes; - fd_set read_fds; - guint max_sock; + GstUDPSrc* udpsrc; + GstBuffer* output; + source_type source_type; + char* data_received; + guint data_size; + struct sockaddr_in6 peer; g_return_val_if_fail (pad != NULL, NULL); g_return_val_if_fail (GST_IS_PAD (pad), NULL); udpsrc = GST_UDPSRC (GST_OBJECT_PARENT (pad)); - FD_ZERO (&read_fds); - FD_SET (udpsrc->sock, &read_fds); - if (udpsrc->control != CONTROL_NONE) { - FD_SET (udpsrc->control_sock, &read_fds); - max_sock = MAX(udpsrc->sock, udpsrc->control_sock); - if (select (max_sock+1, &read_fds, NULL, NULL, NULL) > 0) { - if ((udpsrc->control_sock != -1) && - FD_ISSET (udpsrc->control_sock, &read_fds)) { -#ifndef GST_DISABLE_LOADSAVE - guchar *buf; - int ret; - int fdread; - struct sockaddr addr; - xmlDocPtr doc; - GstCaps *caps; - buf = g_malloc (1024*10); - switch (udpsrc->control) { - case CONTROL_TCP: - len = sizeof (struct sockaddr); - fdread = accept (udpsrc->control_sock, &addr, &len); - if (fdread < 0) { - perror ("accept"); - } - - ret = read (fdread, buf, 1024*10); - break; - case CONTROL_UDP: - ret = recvfrom (udpsrc->control_sock, buf, 1024*10, 0, (struct sockaddr *)&tmpaddr, &len); - if (ret < 0) { - perror ("recvfrom"); - case CONTROL_NONE: - default: - g_free (buf); - return NULL; - } - buf[ret] = '\0'; - doc = xmlParseMemory(buf, ret); - caps = gst_caps_load_thyself(doc->xmlRootNode); - /* foward the connect, we don't signal back the result here... */ - gst_pad_try_set_caps (udpsrc->srcpad, caps); + g_return_val_if_fail (udpsrc->data_socket != 0 && + udpsrc->control_socket != 0, NULL); -#endif - g_free (buf); - outbuf = NULL; + while (1) { + wait_for_data(udpsrc, &data_size, &data_received, &source_type, &peer); + if (source_type == interrupt) { + return NULL; } - else { - outbuf = gst_buffer_new (); - GST_BUFFER_DATA (outbuf) = g_malloc (24000); - GST_BUFFER_SIZE (outbuf) = 24000; - if (udpsrc->first_buf) { - if (udpsrc->clock) { - GstClockTime current_time; - GstEvent *discont; - current_time = gst_clock_get_time (udpsrc->clock); - - GST_BUFFER_TIMESTAMP (outbuf) = current_time; - discont = gst_event_new_discontinuous (FALSE, GST_FORMAT_TIME, - current_time, NULL); - gst_pad_push (udpsrc->srcpad, GST_DATA (discont)); - } - udpsrc->first_buf = FALSE; - else { - GST_BUFFER_TIMESTAMP (outbuf) = GST_CLOCK_TIME_NONE; + if (!check_authorized_peer(udpsrc, &peer)) { + g_free(data_received); + continue; + } + } - len = sizeof (struct sockaddr); - numbytes = recvfrom (udpsrc->sock, GST_BUFFER_DATA (outbuf), - GST_BUFFER_SIZE (outbuf), 0, (struct sockaddr *)&tmpaddr, &len); + if (udpsrc->first_buf) { + deliver_timer_discont_event(udpsrc); - if (numbytes != -1) { - GST_BUFFER_SIZE (outbuf) = numbytes; - perror ("recvfrom"); - gst_buffer_unref (outbuf); - outbuf = NULL; - - } + if (source_type == control) { + return (GstData*) control_event(udpsrc, data_size, data_received, &peer); + } else { + return (GstData*) data_buffer(udpsrc, data_size, data_received, &peer); } - else { - perror ("select"); - outbuf = NULL; + +} +static gboolean +gst_udpsrc_event_handler(GstPad* pad, GstEvent* event) +{ + GstUDPSrc* udpsrc = GST_UDPSRC(gst_pad_get_parent(pad)); + struct control_event* data; + g_assert(udpsrc != NULL); + if (GST_EVENT_TYPE(event) != GST_EVENT_CONTROL) { + return gst_pad_event_default(pad, event); - return GST_DATA (outbuf); + data = (struct control_event*) &event->event_data; + send_control_data(data->size, data->data); @@ -300,25 +254,32 @@ /* it's not null if we got it, but it might not be ours */ g_return_if_fail(GST_IS_UDPSRC(object)); udpsrc = GST_UDPSRC(object); + if (GST_STATE(udpsrc) != GST_STATE_NULL && + (prop_id == ARG_LOCAL_ADDR) || prop_id == ARG_DATA_PORT ) { + return; switch (prop_id) { - case ARG_PORT: - udpsrc->port = g_value_get_int (value); - break; - case ARG_MULTICAST_GROUP: - g_free(udpsrc->multi_group); - if (g_value_get_string (value) == NULL) - udpsrc->multi_group = g_strdup (UDP_DEFAULT_MULTICAST_GROUP); - else - udpsrc->multi_group = g_strdup (g_value_get_string (value)); - case ARG_CONTROL: - udpsrc->control = g_value_get_enum (value); - default: + case ARG_LOCAL_ADDR: + udpsrc->local_address = *(GstAddress*) g_value_get_boxed(value); + udpsrc->local_address_uptodate = FALSE; + break; + case ARG_DATA_PORT: + udpsrc->data_port = g_value_get_int(value); + case ARG_CONTROL_PORT: + udpsrc->control_port = g_value_get_int(value); + case ARG_AUTHORIZED_SENDERS: + udpsrc->authorized_senders = (GList*) g_value_get_boxed(value); + udpsrc->authorized_senders_uptodate = FALSE; + case ARG_REMOTE_ADDRESSES: + udpsrc->remote_addresses = (GList*) g_value_get_boxed(value); + udpsrc->remote_address_uptodate = FALSE; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -330,142 +291,236 @@ - g_value_set_int (value, udpsrc->port); - g_value_set_string (value, udpsrc->multi_group); - g_value_set_enum (value, udpsrc->control); - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + g_value_set_boxed(value, &udpsrc->local_address); + g_value_set_int(value, udpsrc->data_port); + g_value_set_int(value, udpsrc->control_port); + g_value_set_boxed(value, udpsrc->authorized_senders); + g_value_set_boxed(value, udpsrc->remote_addresses); -/* create a socket for sending to remote machine */ +/* Initialize network connections */ static gboolean -gst_udpsrc_init_receive (GstUDPSrc *src) +gst_udpsrc_init_connections(GstUDPSrc* element) - guint bc_val; - gint reuse=1; - memset (&src->myaddr, 0, sizeof (src->myaddr)); - src->myaddr.sin_family = AF_INET; /* host byte order */ - src->myaddr.sin_port = htons (src->port); /* short, network byte order */ - src->myaddr.sin_addr.s_addr = INADDR_ANY; - if ((src->sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) { - perror("socket"); - return FALSE; + struct sockaddr_in6 data_local_address, control_local_address; + gboolean error; + update_raw_addresses(element); + element->data_socket = socket(AF_INET6, SOCK_DGRAM, 0); + if (element->data_socket < 0) + goto err; + error = try_bind(element->data_socket, &element->raw_local_address, element->data_port); + if (error != 0) + element->control_socket = socket(AF_INET6, SOCK_DGRAM, 0); + if (element->control_socket < 0) + error = try_bind(element->control_socket, &element->raw_local_address, element->control_port); + if (error != 0) + error = pipe(element->pipe); + if (error != 0) { - if (setsockopt(src->sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1) { - perror("setsockopt"); - return FALSE; + element->open = TRUE; + return TRUE; + err: + if (element->data_socket > 0) { + close(element->data_socket); + element->data_socket = -1; - if (bind (src->sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) { - perror("bind"); + if (element->control_socket > 0) { + close(element->control_socket); + element->control_socket = -1; + return FALSE; - if (inet_aton (src->multi_group, &(src->multi_addr.imr_multiaddr))) { - if (src->multi_addr.imr_multiaddr.s_addr) { - src->multi_addr.imr_interface.s_addr = INADDR_ANY; - setsockopt (src->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &src->multi_addr, sizeof(src->multi_addr)); +update_raw_addresses(GstUDPSrc* element) + if (!element->local_address_uptodate) { + resolve_address(&element->local_address, &element->raw_local_address); + if (!element->remote_address_uptodate) { + resolve_addresses_ports(element->remote_addresses, &element->raw_remote_addresses); + if (!element->authorized_senders_uptodate) { + resolve_addresses(element->authorized_senders, &element->raw_authorized_senders); - bc_val = 1; - setsockopt (src->sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); - src->myaddr.sin_port = htons (src->port+1); - switch (src->control) { - case CONTROL_TCP: - if ((src->control_sock = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - perror("control_socket"); - return FALSE; - } - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) { - perror("control_bind"); - } - if (listen (src->control_sock, 5) == -1) { - perror("listen"); - - fcntl (src->control_sock, F_SETFL, O_NONBLOCK); - break; - case CONTROL_UDP: - if ((src->control_sock = socket (AF_INET, SOCK_DGRAM, 0)) == -1) { - perror("socket"); - if (bind (src->control_sock, (struct sockaddr *) &src->myaddr, sizeof (src->myaddr)) == -1) - { - perror("control_bind"); - return FALSE; - /* We can only do broadcast in udp */ - bc_val = 1; - setsockopt (src->control_sock, SOL_SOCKET, SO_BROADCAST, &bc_val, sizeof (bc_val)); - case CONTROL_NONE: - GST_FLAG_SET (src, GST_UDPSRC_OPEN); - return TRUE; - return FALSE; +gst_udpsrc_close(GstUDPSrc *element) + if (element->pipe[0] != 0) { + close(element->pipe[0]); + close(element->pipe[1]); + element->open = FALSE; - GST_FLAG_SET (src, GST_UDPSRC_OPEN); +static GstElementStateReturn +gst_udpsrc_change_state(GstElement* element) + GstUDPSrc* udpsrc = GST_UDPSRC(element); + if (GST_STATE_PENDING(element) == GST_STATE_NULL) { + if (udpsrc->open) { + gst_udpsrc_close(udpsrc); + if (!udpsrc->open) { + gst_udpsrc_init_connections(udpsrc); +gst_udpsrc_release_locks(GstElement* element) + char foo = '\0'; + udpsrc = GST_UDPSRC(element); + g_assert(udpsrc != 0); + write(udpsrc->pipe[0], &foo, 1); return TRUE; -static void -gst_udpsrc_close (GstUDPSrc *src) +/* socket functions */ +static void wait_for_data(const GstUDPSrc* udpsrc, guint* data_size, char** data, source_type* source_type, struct sockaddr_in6* peer) - if (src->sock != -1) { - close (src->sock); - src->sock = -1; + int error; + struct pollfd pollfds[3] = + { + {.fd = udpsrc->pipe[1], + .events = POLLIN, + .revents = 0 + }, + {.fd = udpsrc->control_socket, + .events = POLLIN, + .revents = 0 + {.fd = udpsrc->data_socket, + } + }; + int active_fd; + unsigned int num_bytes; + poll(pollfds, 3, 0); + g_assert(error == 0); + if (pollfds[0].revents & POLLIN) { + char foo; + *data = NULL; + *source_type = interrupt; + read(udpsrc->pipe[1], &foo, 1); + struct sockaddr_in6 from; + int foo; + if (pollfds[1].revents & POLLIN) { + *source_type = control; + active_fd = udpsrc->control_socket; + } else if (pollfds[1].revents & POLLIN) { + *source_type = data; + active_fd = udpsrc->data_socket; + error = ioctl(active_fd, FIONREAD, &num_bytes); + g_assert(error == 0); + *data = g_malloc(num_bytes); + *data_size = num_bytes; + recvfrom(active_fd, *data, num_bytes, 0, + (struct sockaddr*) peer, &foo); - if (src->control_sock != -1) { - close (src->control_sock); - src->control_sock = -1; + +gboolean check_authorized_peer(const GstUDPSrc* udpsrc, const struct sockaddr_in6* peer) + guint i; + GList* slab; + update_raw_addresses((GstUDPSrc*)udpsrc); + for (slab = udpsrc->raw_authorized_senders.list; slab != NULL; slab = slab->next) { + GstNumAddress* site = (GstNumAddress*) slab->data; + for (i = 0; i < site->num_alternatives; i++) { + if (memcmp(&peer->sin6_addr, &site->alternatives[i], sizeof(struct in6_addr)) == 0){ + return TRUE; - GST_FLAG_UNSET (src, GST_UDPSRC_OPEN); +data_buffer(const GstUDPSrc* udpsrc, guint data_size, const char* data_received, struct sockaddr_in6* addr) + GstBuffer* buffer; + buffer = gst_buffer_new(); + put_data_addr(udpsrc, &GST_BUFFER_DATA(buffer), &GST_BUFFER_SIZE(buffer), data_size, data_received); -gst_udpsrc_change_state (GstElement *element) +control_event(const GstUDPSrc* udpsrc, guint data_size, const char* data_received, const struct sockaddr_in6* addr) - g_return_val_if_fail (GST_IS_UDPSRC (element), GST_STATE_FAILURE); + GstEvent* event; + struct control_event* ctl; + guint event_data_size; + event = gst_event_new(GST_EVENT_UNKNOWN); + ctl = (struct control_event*) &event->event_data; + put_data_addr(udpsrc, &ctl->data, &ctl->size, data_size, data_received); - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_UDPSRC_OPEN)) - gst_udpsrc_close (GST_UDPSRC (element)); +static void put_data_addr(const GstUDPSrc* udpsrc, char** result_data, guint* result_size, guint data_size, char* data, const struct sockaddr_in6* addr) + if (udpsrc->include_addr_in_data) { + struct buffer_data_extended* data_ex = + g_malloc(data_size + sizeof(guint) + sizeof(struct sockaddr_in6)); + data_ex->addr_size = sizeof(struct sockaddr_in6); + data_ex->addr = *addr; + memcpy((char*) data_ex->data, data_received, data_size); + g_free(data_received); + *result_data = (char*) data_ex; + *result_size = data_size; } else { - if (!GST_FLAG_IS_SET (element, GST_UDPSRC_OPEN)) { - if (!gst_udpsrc_init_receive (GST_UDPSRC (element))) - return GST_STATE_FAILURE; + *result_data = data_received; + *result_size = data_size - if (GST_ELEMENT_CLASS (parent_class)->change_state) - return GST_ELEMENT_CLASS (parent_class)->change_state (element); - return GST_STATE_SUCCESS; +send_control_data(GstUDPSrc* udpsrc, guint size, gpointer data) + update_raw_addresses(); + for (slab = udpsrc->raw_remote_senders.list; slab != NULL; slab = slab->next) { + GstNumAddressPort* remote = (GstNumAddressPort*) slab->data; + try_send(udpsrc->control_socket, remote, data, size); Index: gstudpsrc.h RCS file: /cvs/gstreamer/gst-plugins/gst/udp/gstudpsrc.h,v retrieving revision 1.7 retrieving revision 1.7.8.1 diff -u -d -r1.7 -r1.7.8.1 --- a/gstudpsrc.h 7 Nov 2003 12:46:35 -0000 1.7 +++ b/gstudpsrc.h 25 Feb 2004 23:32:40 -0000 1.7.8.1 @@ -23,20 +23,15 @@ #include <gst/gst.h> -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ -#include <errno.h> -#include <string.h> -#include <sys/types.h> -#include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> -#include <arpa/inet.h> #include <fcntl.h> -#include "gstudp.h" +#include "address.h" +G_BEGIN_DECLS #define GST_TYPE_UDPSRC \ (gst_udpsrc_get_type()) @@ -52,29 +47,50 @@ typedef struct _GstUDPSrc GstUDPSrc; typedef struct _GstUDPSrcClass GstUDPSrcClass; +enum { + ARG_0, + ARG_LOCAL_ADDR, + ARG_DATA_PORT, + ARG_CONTROL_PORT, + ARG_AUTHORIZED_SENDERS, + ARG_REMOTE_ADDRESSES, typedef enum { GST_UDPSRC_OPEN = GST_ELEMENT_FLAG_LAST, GST_UDPSRC_FLAG_LAST = GST_ELEMENT_FLAG_LAST + 2, } GstUDPSrcFlags; +// FIXME: Release memory of stored data after setting them as properties. +// Release memory of stored data in the destructor struct _GstUDPSrc { GstElement element; /* pads */ - GstPad *sinkpad,*srcpad; - int port; - int sock; - int control_sock; - Gst_UDP_Control control; - gchar *multi_group; + GstPad* srcpad; + GstAddress local_address; + gint data_port, control_port; + /* raw members have the names resolved */ + GstNumAddress raw_local_address; + gboolean local_address_uptodate; + int data_socket, control_socket; + GList* remote_addresses; /* Addresses that receive our control messages. GList of GstAddrPort */ + GstNumAddressesPorts raw_remote_addresses; + gboolean remote_address_uptodate; + GList* authorized_senders; /* Packets from other addresses are dropped. Glist of GstAddress */ + gboolean authorized_senders_uptodate; + GstNumAddresses raw_authorized_senders; - struct sockaddr_in myaddr; - struct ip_mreq multi_addr; GstClock *clock; + int pipe[2]; gboolean first_buf; + gboolean open; + gboolean include_addr_in_data; // FIXME Initialize struct _GstUDPSrcClass { @@ -84,9 +100,20 @@ GType gst_udpsrc_get_type(void); +#define GST_EVENT_CONTROL 100 +struct control_event { + guint size; + gpointer data; /* Actually a (struct buffer_data_extended *) buffer */ +struct buffer_data_extended { + guint addr_size; + struct sockaddr_in6 addr; + char data[1]; +G_END_DECLS #endif /* __GST_UDPSRC_H__ */ |