From: <sim...@on...> - 2007-07-13 20:20:01
|
# HG changeset patch # User Simon Farnsworth <sim...@on...> # Date 1184235867 -3600 # Node ID 071ad7a7574be82f9f8591579775f9d3e6f7d9c2 # Parent 583c9584d6db023c54aba8a5f1792be6581370df Simplify input_rtp locking We have seen input_rtp lock up in use, and traced the problem to the separate tail/head locks on the input buffer. Reduce to a single lock, increasing lock contention between the reader and the writer, but removing the previous deadlock risk. Also use select() before recv(), to ensure that we never wait forever for packets (e.g. if we're trying to receive a multicast stream, but an administrator has blocked all multicast packets to the device - iptables -A INPUT --dst 224.0.0.0/4 -j DROP induces this failure for testing). diff -r 071ad7a7574be82f9f8591579775f9d3e6f7d9c2 -r 583c9584d6db023c54aba8a5f1792be6581370df src/input/input_rtp.c --- a/src/input/input_rtp.c Thu Jul 12 11:24:27 2007 +0100 +++ b/src/input/input_rtp.c Thu Jul 12 11:28:43 2007 +0100 @@ -79,6 +79,7 @@ #include <sys/time.h> #include <stdlib.h> #include <net/if.h> +#include <sys/select.h> #if defined (__SVR4) && defined (__sun) # include <sys/sockio.h> @@ -125,11 +126,9 @@ typedef struct { int fh; unsigned char *buffer; /* circular buffer */ - unsigned char *buffer_tail; /* tail pointer used by reader */ - unsigned char *buffer_head; /* head pointer used by writer */ + unsigned char *buffer_get_ptr; /* get pointer used by reader */ + unsigned char *buffer_put_ptr; /* put pointer used by writer */ long buffer_count; /* number of bytes in the buffer */ - pthread_mutex_t buffer_mutex; /* only used for locking the - * the buffer count variable */ unsigned char packet_buffer[65536]; @@ -143,13 +142,12 @@ typedef struct { char preview[MAX_PREVIEW_SIZE]; int preview_size; + int preview_read_done; /* boolean true after attempt to read input stream for preview */ nbc_t *nbc; - pthread_mutex_t writer_mut; + pthread_mutex_t buffer_ring_mut; pthread_cond_t writer_cond; - - pthread_mutex_t reader_mut; pthread_cond_t reader_cond; } rtp_input_plugin_t; @@ -198,7 +196,7 @@ static int host_connect_attempt(struct i /* Try to increase receive buffer to 1MB to avoid dropping packets */ - optval = 1024 * 1024; + optval = BUFFER_SIZE; if ((setsockopt(s, SOL_SOCKET, SO_RCVBUF, &optval, sizeof(optval))) < 0) { LOG_MSG(xine, _("setsockopt(SO_RCVBUF): %s.\n"), strerror(errno)); @@ -298,6 +296,7 @@ static void * input_plugin_read_loop(voi rtp_input_plugin_t *this = (rtp_input_plugin_t *) arg; unsigned char *data; long length; + fd_set read_fds; while (1) { @@ -308,8 +307,28 @@ static void * input_plugin_read_loop(voi */ pthread_testcancel(); - length = recv(this->fh, this->packet_buffer, - sizeof(this->packet_buffer), 0); + { + struct timeval recv_timeout; + int rc; + + recv_timeout.tv_sec = 2; + recv_timeout.tv_usec = 0; + + FD_ZERO( &read_fds ); + FD_SET( this->fh, &read_fds ); + + /* wait for a packet to arrive - but do not hang! */ + rc = select( this->fh+1, &read_fds, NULL, NULL, &recv_timeout ); + if( rc > 0 ) + { + length = recv(this->fh, this->packet_buffer, + sizeof(this->packet_buffer), 0); + } + else if( rc == 0 ) + length = 0; + else + length = -1; + } pthread_testcancel(); if (length < 0) { @@ -362,28 +381,31 @@ static void * input_plugin_read_loop(voi } /* insert data into cyclic buffer */ - while (length > 0) { - - /* work with a copy of buffer count, while the variable can - * be updated by the reader - */ - - long buffer_count = this->buffer_count; - long n; + if (length > 0) { /* * if the buffer is full, wait for the reader * to signal */ - if(buffer_count >= BUFFER_SIZE) { - pthread_mutex_lock(&this->writer_mut); - pthread_cond_wait(&this->writer_cond, &this->writer_mut); - pthread_mutex_unlock(&this->writer_mut); - /* update the buffer count again */ - buffer_count = this->buffer_count; - } - + pthread_mutex_lock(&this->buffer_ring_mut); + /* wait for enough space to write the whole of the recv'ed data */ + while( (BUFFER_SIZE - this->buffer_count) < length ) + { + struct timeval tv; + struct timespec timeout; + + gettimeofday(&tv, NULL); + + timeout.tv_nsec = tv.tv_usec * 1000; + timeout.tv_sec = tv.tv_sec + 2; + [... 93 lines omitted ...] + if(pthread_cond_timedwait(&this->reader_cond, &this->buffer_ring_mut, &timeout) != 0) { /* we timed out, no data available */ - pthread_mutex_unlock(&this->reader_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); return copied; } - pthread_mutex_unlock(&this->reader_mut); - /* update the local buffer count variable again */ - buffer_count = this->buffer_count; } /* Now determine how many bytes can be read. If the buffer @@ -478,43 +484,60 @@ static off_t rtp_plugin_read (input_plug * update the buffer count. Finally read the second piece * from the base to the remaining count */ - if(length > buffer_count) { - n = buffer_count; + if(length > this->buffer_count) { + n = this->buffer_count; } else { n = length; } - if(((this->buffer_tail - this->buffer) + n) > BUFFER_SIZE) { - n = BUFFER_SIZE - (this->buffer_tail - this->buffer); + if(((this->buffer_get_ptr - this->buffer) + n) > BUFFER_SIZE) { + n = BUFFER_SIZE - (this->buffer_get_ptr - this->buffer); } /* the actual read */ - memcpy(buf, this->buffer_tail, n); + memcpy(buf, this->buffer_get_ptr, n); buf += n; copied += n; length -= n; /* update the tail pointer, watch for wrap arounds */ - this->buffer_tail += n; - if(this->buffer_tail - this->buffer >= BUFFER_SIZE) - this->buffer_tail = this->buffer; + this->buffer_get_ptr += n; + if(this->buffer_get_ptr - this->buffer >= BUFFER_SIZE) + this->buffer_get_ptr = this->buffer; - /* lock the buffer, for updating the count */ - pthread_mutex_lock(&this->buffer_mutex); this->buffer_count -= n; - pthread_mutex_unlock(&this->buffer_mutex); /* signal the writer that there's space in the buffer again */ - pthread_mutex_lock(&this->writer_mut); pthread_cond_signal(&this->writer_cond); - pthread_mutex_unlock(&this->writer_mut); + pthread_mutex_unlock(&this->buffer_ring_mut); } this->curpos += copied; return copied; +} + +static buf_element_t *rtp_plugin_read_block (input_plugin_t *this_gen, + fifo_buffer_t *fifo, off_t todo) { + buf_element_t *buf = fifo->buffer_pool_alloc (fifo); + int total_bytes; + + + buf->content = buf->mem; + buf->type = BUF_DEMUX_BLOCK; + + total_bytes = rtp_plugin_read (this_gen, buf->content, todo); + + if (total_bytes != todo) { + buf->free_buffer (buf); + return NULL; + } + + buf->size = total_bytes; + + return buf; } /* @@ -584,9 +607,11 @@ static int rtp_plugin_get_optional_data */ if (data_type == INPUT_OPTIONAL_DATA_PREVIEW) { - if (this->preview_size == 0) { + if (!this->preview_read_done) { this->preview_size = rtp_plugin_read(this_gen, this->preview, MAX_PREVIEW_SIZE); lprintf("Preview data length = %d\n", this->preview_size); + + this->preview_read_done = 1; } memcpy(data, this->preview, this->preview_size); return this->preview_size; @@ -705,23 +730,21 @@ static input_plugin_t *rtp_class_get_ins if (iptr) this->interface = iptr; - pthread_mutex_init(&this->buffer_mutex, NULL); - pthread_mutex_init(&this->reader_mut, NULL); - pthread_mutex_init(&this->writer_mut, NULL); + pthread_mutex_init(&this->buffer_ring_mut, NULL); pthread_cond_init(&this->reader_cond, NULL); pthread_cond_init(&this->writer_cond, NULL); this->buffer = malloc(BUFFER_SIZE); - this->buffer_head = this->buffer; - this->buffer_tail = this->buffer; + this->buffer_put_ptr = this->buffer; + this->buffer_get_ptr = this->buffer; this->buffer_count = 0; this->curpos = 0; this->input_plugin.open = rtp_plugin_open; this->input_plugin.get_capabilities = rtp_plugin_get_capabilities; this->input_plugin.read = rtp_plugin_read; - this->input_plugin.read_block = NULL; + this->input_plugin.read_block = rtp_plugin_read_block; this->input_plugin.seek = rtp_plugin_seek; this->input_plugin.get_current_pos = rtp_plugin_get_current_pos; this->input_plugin.get_length = rtp_plugin_get_length; |