|
From: <cha...@us...> - 2007-12-04 15:39:56
|
Revision: 367
http://sipp.svn.sourceforge.net/sipp/?rev=367&view=rev
Author: charlespwright
Date: 2007-12-04 07:39:54 -0800 (Tue, 04 Dec 2007)
Log Message:
-----------
Enh: Provide fine-grained TCP reconnection support, so that we do not need to tear down all of the connections if just a single TCP connection is reset. Also include statistics for TCP call failures in the CSV file.
Modified Paths:
--------------
sipp/trunk/call.cpp
sipp/trunk/call.hpp
sipp/trunk/sipp.cpp
sipp/trunk/sipp.hpp
sipp/trunk/stat.cpp
sipp/trunk/stat.hpp
Modified: sipp/trunk/call.cpp
===================================================================
--- sipp/trunk/call.cpp 2007-12-04 15:38:53 UTC (rev 366)
+++ sipp/trunk/call.cpp 2007-12-04 15:39:54 UTC (rev 367)
@@ -255,10 +255,6 @@
delete call_ptr;
open_calls--;
- } else {
- if (start_calls == 0) {
- ERROR("Call not found");
- }
}
}
@@ -774,18 +770,18 @@
call_established= false ;
}
-void call::connect_socket_if_needed()
+bool call::connect_socket_if_needed()
{
bool existing;
- if(call_socket) return;
- if(!multisocket) return;
+ if(call_socket) return true;
+ if(!multisocket) return true;
if(transport == T_UDP) {
struct sockaddr_storage saddr;
if(toolMode != MODE_CLIENT)
- return;
+ return true;
char peripaddr[256];
if (!peripsocket) {
@@ -814,7 +810,7 @@
}
}
if (existing) {
- return;
+ return true;
}
memset(&saddr, 0, sizeof(struct sockaddr_storage));
@@ -863,7 +859,7 @@
}
if (existing) {
- return;
+ return true;
}
sipp_customize_socket(call_socket);
@@ -873,14 +869,26 @@
}
if (sipp_connect_socket(call_socket, L_dest)) {
- if (reset_number > 0) {
+ if (reconnect_allowed()) {
if(errno == EINVAL){
/* This occurs sometime on HPUX but is not a true INVAL */
WARNING("Unable to connect a TCP socket, remote peer error");
} else {
WARNING("Unable to connect a TCP socket");
}
- start_calls = 1;
+ /* This connection failed. We must be in multisocket mode, because
+ * otherwise we would already have a call_socket. This call can not
+ * succeed, but does not affect any of our other calls. We do decrement
+ * the reconnection counter however. */
+ if (reset_number != -1) {
+ reset_number--;
+ }
+
+ CStat::instance()->computeStat(CStat::E_CALL_FAILED);
+ CStat::instance()->computeStat(CStat::E_FAILED_TCP_CONNECT);
+ delete_call(id);
+
+ return false;
} else {
if(errno == EINVAL){
/* This occurs sometime on HPUX but is not a true INVAL */
@@ -891,6 +899,7 @@
}
}
}
+ return true;
}
bool lost(int index)
@@ -1271,7 +1280,10 @@
char *L_ptr2 ;
/* Socket port must be known before string substitution */
- connect_socket_if_needed();
+ if (!connect_socket_if_needed()) {
+ *send_status = -2;
+ return NULL;
+ }
assert(call_socket);
@@ -1602,7 +1614,8 @@
} else if(send_status < 0) { /* Send error */
/* The timeout will not be sent, so the timeout is no longer needed. */
send_timeout = 0;
- return false; /* call deleted */
+ /* The call was already deleted by connect_socket_if_needed or send_raw. */
+ return false;
}
/* We have sent the message, so the timeout is no longer needed. */
send_timeout = 0;
Modified: sipp/trunk/call.hpp
===================================================================
--- sipp/trunk/call.hpp 2007-12-04 15:38:53 UTC (rev 366)
+++ sipp/trunk/call.hpp 2007-12-04 15:39:54 UTC (rev 367)
@@ -293,7 +293,7 @@
int send_raw(char * msg, int index);
char * send_scene(int index, int *send_status);
- void connect_socket_if_needed();
+ bool connect_socket_if_needed();
char * compute_cseq(char * src);
char * get_header_field_code(char * msg, char * code);
Modified: sipp/trunk/sipp.cpp
===================================================================
--- sipp/trunk/sipp.cpp 2007-12-04 15:38:53 UTC (rev 366)
+++ sipp/trunk/sipp.cpp 2007-12-04 15:39:54 UTC (rev 367)
@@ -1849,6 +1849,7 @@
/* Refuse to write to invalid sockets. */
if (socket->ss_invalid) {
+ WARNING_P2("Returning EPIPE on invalid socket: %p (%d)\n", socket, socket->ss_fd);
errno = EPIPE;
return -1;
}
@@ -1937,8 +1938,10 @@
if (socket->ss_transport == T_TCP && errno == EPIPE) {
nb_net_send_errors++;
- start_calls = 1;
- if (reset_number > 0) {
+ close(socket->ss_fd);
+ socket->ss_fd = -1;
+ sockets_pending_reset.insert(socket);
+ if (reconnect_allowed()) {
WARNING("Broken pipe on TCP connection, remote peer "
"probably closed the socket");
} else {
@@ -2000,17 +2003,23 @@
}
}
}else {
+ /* The socket was closed "cleanly", but we may have calls that need to
+ * be destroyed. Also, if these calls are not complete, and attempt to
+ * send again we may "ressurect" the socket by reconnecting it.*/
sipp_socket_invalidate(socket);
if (reset_close) {
- close_calls(socket);
- }
+ close_calls(socket);
+ }
}
return 0;
}
+ close(socket->ss_fd);
+ socket->ss_fd = -1;
+ sockets_pending_reset.insert(socket);
+
nb_net_recv_errors++;
- start_calls = 1;
- if (reset_number > 0) {
+ if (reconnect_allowed()) {
WARNING_P1("Error on TCP connection, remote peer probably closed the socket: %s", errstring);
} else {
ERROR_P1("Error on TCP connection, remote peer probably closed the socket: %s", errstring);
@@ -2381,6 +2390,7 @@
shutdown(socket->ss_fd, SHUT_RDWR);
close(socket->ss_fd);
+ socket->ss_fd = -1;
if((pollidx = socket->ss_pollidx) >= pollnfds) {
ERROR_P2("Pollset error: index %d is greater than number of fds %d!", pollidx, pollnfds);
@@ -2678,7 +2688,7 @@
}
struct sipp_socket *localSocket = sipp_accept_socket(sock);
- localSocket->ss_control = 1;
+ localSocket->ss_control = 1;
local_sockets[local_nb] = localSocket;
local_nb++;
if(!peers_connected){
@@ -2757,10 +2767,6 @@
clock_tick = new_time;
last_time = new_time;
- if (start_calls == 1) {
- reset_connections();
- }
-
if (signalDump) {
/* Screen dumping in a file */
if (screenf) {
@@ -2784,7 +2790,12 @@
signalDump = false ;
}
- if ((!quitting) && (!paused) && (!start_calls)) {
+ while (sockets_pending_reset.begin() != sockets_pending_reset.end()) {
+ reset_connection(*(sockets_pending_reset.begin()));
+ sockets_pending_reset.erase(sockets_pending_reset.begin());
+ }
+
+ if ((!quitting) && (!paused)) {
long l=0;
if (users) {
@@ -2838,6 +2849,11 @@
}
call_ptr -> run();
+
+ while (sockets_pending_reset.begin() != sockets_pending_reset.end()) {
+ reset_connection(*(sockets_pending_reset.begin()));
+ sockets_pending_reset.erase(sockets_pending_reset.begin());
+ }
}
new_time = getmilliseconds();
@@ -2922,13 +2938,25 @@
call_list::iterator iter;
for(iter = running_calls->begin(); iter != running_calls->end(); iter++) {
- if(last) { last -> run(); }
+ if(last) {
+ last -> run();
+ while (sockets_pending_reset.begin() != sockets_pending_reset.end()) {
+ reset_connection(*(sockets_pending_reset.begin()));
+ sockets_pending_reset.erase(sockets_pending_reset.begin());
+ }
+ }
last = *iter;
if (--loops <= 0) {
break;
}
}
- if(last) { last -> run(); }
+ if(last) {
+ last -> run();
+ while (sockets_pending_reset.begin() != sockets_pending_reset.end()) {
+ reset_connection(*(sockets_pending_reset.begin()));
+ sockets_pending_reset.erase(sockets_pending_reset.begin());
+ }
+ }
/* Update the clock. */
new_time = getmilliseconds();
@@ -3412,9 +3440,8 @@
return sipp_allocate_socket(use_ipv6, transport, fd, 0);
}
-struct sipp_socket *new_sipp_socket(bool use_ipv6, int transport) {
+int socket_fd(bool use_ipv6, int transport) {
int socket_type;
- struct sipp_socket *ret;
int fd;
switch(transport) {
@@ -3434,6 +3461,13 @@
ERROR_P1("Unable to get a %s socket", TRANSPORT_TO_STRING(transport));
}
+ return fd;
+}
+
+struct sipp_socket *new_sipp_socket(bool use_ipv6, int transport) {
+ struct sipp_socket *ret;
+ int fd = socket_fd(use_ipv6, transport);
+
ret = sipp_allocate_socket(use_ipv6, transport, fd);
if (!ret) {
close(fd);
@@ -3491,6 +3525,9 @@
}
memcpy(&ret->ss_remote_sockaddr, &remote_sockaddr, sizeof(ret->ss_remote_sockaddr));
+ /* We should connect back to the address which connected to us if we
+ * experience a TCP failure. */
+ memcpy(&ret->ss_dest, &remote_sockaddr, sizeof(ret->ss_remote_sockaddr));
if (ret->ss_transport == T_TLS) {
#ifdef _USE_OPENSSL
@@ -3537,14 +3574,15 @@
return 0;
}
-int sipp_connect_socket(struct sipp_socket *socket, struct sockaddr_storage *dest) {
- int fd;
+int sipp_do_connect_socket(struct sipp_socket *socket) {
+ int ret;
assert(socket->ss_transport == T_TCP || socket->ss_transport == T_TLS);
- fd = connect(socket->ss_fd, (struct sockaddr *)dest, SOCK_ADDR_SIZE(dest));
- if (fd < 0) {
- return fd;
+ errno = 0;
+ ret = connect(socket->ss_fd, (struct sockaddr *)&socket->ss_dest, SOCK_ADDR_SIZE(&socket->ss_dest));
+ if (ret < 0) {
+ return ret;
}
if (socket->ss_transport == T_TLS) {
@@ -3561,6 +3599,50 @@
return 0;
}
+int sipp_connect_socket(struct sipp_socket *socket, struct sockaddr_storage *dest) {
+ memcpy(&socket->ss_dest, dest, SOCK_ADDR_SIZE(dest));
+ return sipp_do_connect_socket(socket);
+}
+
+int sipp_reconnect_socket(struct sipp_socket *socket) {
+ assert(socket->ss_fd == -1);
+
+ socket->ss_fd = socket_fd(socket->ss_ipv6, socket->ss_transport);
+ if (socket->ss_fd == -1) {
+ ERROR_NO("Could not obtain new socket: ");
+ }
+
+ if (socket->ss_invalid) {
+#ifdef _USE_OPENSSL
+ socket->ss_ssl = NULL;
+
+ if ( transport == T_TLS ) {
+ if ((socket->ss_bio = BIO_new_socket(socket->ss_fd,BIO_NOCLOSE)) == NULL) {
+ ERROR("Unable to create BIO object:Problem with BIO_new_socket()\n");
+ }
+
+ if (!(socket->ss_ssl = SSL_new(sip_trp_ssl_ctx_client))) {
+ ERROR("Unable to create SSL object : Problem with SSL_new() \n");
+ }
+
+ SSL_set_bio(socket->ss_ssl,socket->ss_bio,socket->ss_bio);
+ }
+#endif
+
+ /* Store this socket in the tables. */
+ socket->ss_pollidx = pollnfds++;
+ sockets[socket->ss_pollidx] = socket;
+ pollfiles[socket->ss_pollidx].fd = socket->ss_fd;
+ pollfiles[socket->ss_pollidx].events = POLLIN | POLLERR;
+ pollfiles[socket->ss_pollidx].revents = 0;
+
+ socket->ss_invalid = false;
+ }
+
+ return sipp_do_connect_socket(socket);
+}
+
+
/* Main */
int main(int argc, char *argv[])
{
@@ -4394,53 +4476,41 @@
usleep(usec);
}
-int reset_connections() {
- int status=0;
+bool reconnect_allowed() {
+ if (reset_number == -1) {
+ return true;
+ }
+ return (reset_number > 0);
+}
- start_calls = 1;
- reset_number--;
+void reset_connection(struct sipp_socket *socket) {
+ if (!reconnect_allowed()) {
+ ERROR_NO("Max number of reconnections reached");
+ }
- if (reset_number < 0) {
- ERROR_NO("Max number of reconnections reached");
+ if (reset_number != -1) {
+ reset_number--;
}
+
if (reset_close) {
- status = close_calls();
+ WARNING("Closing calls, because of TCP reset or close!");
+ close_calls(socket);
}
- if (status==0) {
- status = close_connections();
- if (status==0) {
- do{
- sipp_usleep(reset_sleep * 1000);
- status = open_connections();
- }while(status == 1);
- start_calls = 0;
- WARNING("Re-connection for connections");
- }
- }
- return status;
-}
+ /* Sleep for some period of time before the reconnection. */
+ usleep(1000 * reset_sleep);
-/* Close *all* of the calls. */
-int close_calls() {
- int status=0;
- call_map * calls = get_calls();
- call_map::iterator call_it;
- call * call_ptr = NULL;
-
- while (calls->begin() != calls->end()) {
- call_ptr = (calls->begin() != calls->end()) ? (calls->begin())->second : NULL ;
- if(call_ptr) {
- call_ptr->terminate(CStat::E_NO_ACTION);
- }
+ if (sipp_reconnect_socket(socket) < 0) {
+ WARNING_NO("Could not reconnect TCP socket");
+ close_calls(socket);
+ } else {
+ WARNING("Socket required a reconnection.");
}
- return status;
}
/* Close just those calls for a given socket (e.g., if the remote end closes
* the connection. */
-int close_calls(struct sipp_socket *socket) {
- int status=0;
+void close_calls(struct sipp_socket *socket) {
call_list * calls = get_calls_for_socket(socket);
call_list::iterator call_it;
call * call_ptr = NULL;
@@ -4448,25 +4518,13 @@
for (call_it = calls->begin(); call_it != calls->end(); call_it++) {
call_ptr = *call_it;
if(call_ptr) {
- call_ptr->terminate(CStat::E_NO_ACTION);
+ call_ptr->terminate(CStat::E_FAILED_TCP_CLOSED);
}
}
delete calls;
-
- return status;
}
-int close_connections() {
- int status=0;
-
- if (toolMode != MODE_SERVER) {
- sipp_close_socket(main_socket);
- main_socket = NULL;
- }
- return status;
-}
-
int open_connections() {
int status=0;
local_port = 0;
Modified: sipp/trunk/sipp.hpp
===================================================================
--- sipp/trunk/sipp.hpp 2007-12-04 15:38:53 UTC (rev 366)
+++ sipp/trunk/sipp.hpp 2007-12-04 15:39:54 UTC (rev 367)
@@ -45,6 +45,7 @@
#include <vector>
#include <string>
#include <map>
+#include <set>
#include <math.h>
#if defined(__HPUX) || defined(__SUNOS)
@@ -284,6 +285,7 @@
struct sipp_socket *sipp_accept_socket(struct sipp_socket *accept_socket);
extern int sipp_bind_socket(struct sipp_socket *socket, struct sockaddr_storage *saddr, int *port);
extern int sipp_connect_socket(struct sipp_socket *socket, struct sockaddr_storage *dest);
+extern int sipp_reconnect_socket(struct sipp_socket *socket);
extern void sipp_customize_socket(struct sipp_socket *socket);
extern int delete_socket(int P_socket);
extern int min_socket _DEFVAL(65535);
@@ -355,10 +357,12 @@
extern int user_port _DEFVAL(0);
extern char hostname[80];
extern bool is_ipv6 _DEFVAL(false);
-extern int start_calls _DEFVAL(0);
+
extern int reset_number _DEFVAL(0);
extern int reset_close _DEFVAL(1);
extern int reset_sleep _DEFVAL(1000);
+/* A list of sockets pending reset. */
+extern set<struct sipp_socket *> sockets_pending_reset;
extern struct addrinfo * local_addr_storage;
@@ -497,6 +501,7 @@
BIO *ss_bio; /* The underlying BIO descriptor for this socket. */
#endif
struct sockaddr_storage ss_remote_sockaddr; /* Who we are talking to. */
+ struct sockaddr_storage ss_dest; /* Who we are talking to. */
int ss_pollidx; /* The index of this socket in our poll structures. */
@@ -541,9 +546,9 @@
char *get_peer_addr(char *);
int get_decimal_from_hex(char hex);
-int reset_connections() ;
-int close_calls();
-int close_calls(struct sipp_socket *);
+bool reconnect_allowed();
+void reset_connection(struct sipp_socket *);
+void close_calls(struct sipp_socket *);
int close_connections();
int open_connections();
void timeout_alarm(int);
Modified: sipp/trunk/stat.cpp
===================================================================
--- sipp/trunk/stat.cpp 2007-12-04 15:38:53 UTC (rev 366)
+++ sipp/trunk/stat.cpp 2007-12-04 15:39:54 UTC (rev 367)
@@ -574,6 +574,18 @@
M_counters [CPT_PL_FailedCallMaxUdpRetrans]++;
break;
+ case E_FAILED_TCP_CONNECT :
+ M_counters [CPT_C_FailedCallTcpConnect]++;
+ M_counters [CPT_PD_FailedCallTcpConnect]++;
+ M_counters [CPT_PL_FailedCallTcpConnect]++;
+ break;
+
+ case E_FAILED_TCP_CLOSED :
+ M_counters [CPT_C_FailedCallTcpClosed]++;
+ M_counters [CPT_PD_FailedCallTcpClosed]++;
+ M_counters [CPT_PL_FailedCallTcpClosed]++;
+ break;
+
case E_FAILED_UNEXPECTED_MSG :
M_counters [CPT_C_FailedCallUnexpectedMessage]++;
M_counters [CPT_PD_FailedCallUnexpectedMessage]++;
@@ -1256,6 +1268,10 @@
<< "FailedCannotSendMessage(C)" << stat_delimiter
<< "FailedMaxUDPRetrans(P)" << stat_delimiter
<< "FailedMaxUDPRetrans(C)" << stat_delimiter
+ << "FailedTcpConnect(P)" << stat_delimiter
+ << "FailedTcpConnect(C)" << stat_delimiter
+ << "FailedTcpClosed(P)" << stat_delimiter
+ << "FailedTcpClosed(C)" << stat_delimiter
<< "FailedUnexpectedMessage(P)" << stat_delimiter
<< "FailedUnexpectedMessage(C)" << stat_delimiter
<< "FailedCallRejected(P)" << stat_delimiter
@@ -1349,6 +1365,10 @@
<< M_counters[CPT_C_FailedCallCannotSendMessage] << stat_delimiter
<< M_counters[CPT_PL_FailedCallMaxUdpRetrans] << stat_delimiter
<< M_counters[CPT_C_FailedCallMaxUdpRetrans ] << stat_delimiter
+ << M_counters[CPT_PL_FailedCallTcpConnect] << stat_delimiter
+ << M_counters[CPT_C_FailedCallTcpConnect] << stat_delimiter
+ << M_counters[CPT_PL_FailedCallTcpClosed] << stat_delimiter
+ << M_counters[CPT_C_FailedCallTcpClosed] << stat_delimiter
<< M_counters[CPT_PL_FailedCallUnexpectedMessage] << stat_delimiter
<< M_counters[CPT_C_FailedCallUnexpectedMessage] << stat_delimiter
<< M_counters[CPT_PL_FailedCallCallRejected] << stat_delimiter
Modified: sipp/trunk/stat.hpp
===================================================================
--- sipp/trunk/stat.hpp 2007-12-04 15:38:53 UTC (rev 366)
+++ sipp/trunk/stat.hpp 2007-12-04 15:39:54 UTC (rev 367)
@@ -101,6 +101,8 @@
E_ADD_RESPONSE_TIME_DURATION,
E_FAILED_CANNOT_SEND_MSG,
E_FAILED_MAX_UDP_RETRANS,
+ E_FAILED_TCP_CONNECT,
+ E_FAILED_TCP_CLOSED,
E_FAILED_UNEXPECTED_MSG,
E_FAILED_CALL_REJECTED,
E_FAILED_CMD_NOT_SENT,
@@ -145,6 +147,8 @@
CPT_C_AverageResponseTime_Squares_5,
CPT_C_FailedCallCannotSendMessage,
CPT_C_FailedCallMaxUdpRetrans,
+ CPT_C_FailedCallTcpConnect,
+ CPT_C_FailedCallTcpClosed,
CPT_C_FailedCallUnexpectedMessage,
CPT_C_FailedCallCallRejected,
CPT_C_FailedCallCmdNotSent,
@@ -187,6 +191,8 @@
CPT_PD_AverageResponseTime_Squares_5,
CPT_PD_FailedCallCannotSendMessage,
CPT_PD_FailedCallMaxUdpRetrans,
+ CPT_PD_FailedCallTcpConnect,
+ CPT_PD_FailedCallTcpClosed,
CPT_PD_FailedCallUnexpectedMessage,
CPT_PD_FailedCallCallRejected,
CPT_PD_FailedCallCmdNotSent,
@@ -230,6 +236,8 @@
CPT_PL_AverageResponseTime_Squares_5,
CPT_PL_FailedCallCannotSendMessage,
CPT_PL_FailedCallMaxUdpRetrans,
+ CPT_PL_FailedCallTcpConnect,
+ CPT_PL_FailedCallTcpClosed,
CPT_PL_FailedCallUnexpectedMessage,
CPT_PL_FailedCallCallRejected,
CPT_PL_FailedCallCmdNotSent,
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|