From: Vlad S. <ser...@us...> - 2006-01-16 04:30:06
|
Update of /cvsroot/naviserver/naviserver/nsd In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv25346/nsd Modified Files: connio.c driver.c nsd.h Log Message: See ChangeLog, this is experimental feature Index: driver.c =================================================================== RCS file: /cvsroot/naviserver/naviserver/nsd/driver.c,v retrieving revision 1.31 retrieving revision 1.32 diff -C2 -d -r1.31 -r1.32 *** driver.c 14 Jan 2006 18:45:21 -0000 1.31 --- driver.c 16 Jan 2006 04:29:57 -0000 1.32 *************** *** 54,59 **** --- 54,61 ---- typedef enum { + Reason_Close, Reason_CloseTimeout, Reason_ReadTimeout, + Reason_WriteTimeout, Reason_ServerReject, Reason_SockError, *************** *** 83,86 **** --- 85,100 ---- /* + * The following maintains files to be written to the clients + */ + + typedef struct WriterSock { + struct WriterSock *nextPtr; + Sock *sockPtr; + int fd; + int nsend; + int flags; + } WriterSock; + + /* * Static functions defined in this file. */ *************** *** 88,96 **** static Ns_ThreadProc DriverThread; static Ns_ThreadProc SpoolThread; static int SetServer(Sock *sockPtr); static Sock *SockAccept(Driver *drvPtr); static void SockRelease(Sock *sockPtr, ReleaseReasons reason); static void SockTrigger(SOCKET sock); ! static void SockPoll(Sock *sockPtr, struct pollfd **pfds, unsigned int *nfds, unsigned int *maxfds, Ns_Time *timeoutPtr); static void SockTimeout(Sock *sockPtr, Ns_Time *nowPtr, int timeout); --- 102,111 ---- static Ns_ThreadProc DriverThread; static Ns_ThreadProc SpoolThread; + static Ns_ThreadProc WriterThread; static int SetServer(Sock *sockPtr); static Sock *SockAccept(Driver *drvPtr); static void SockRelease(Sock *sockPtr, ReleaseReasons reason); static void SockTrigger(SOCKET sock); ! static void SockPoll(Sock *sockPtr, int type, struct pollfd **pfds, unsigned int *nfds, unsigned int *maxfds, Ns_Time *timeoutPtr); static void SockTimeout(Sock *sockPtr, Ns_Time *nowPtr, int timeout); *************** *** 99,104 **** static int SockParse(Sock *sockPtr, int spooler); ! static int SockSpoolPush(Sock *sockPtr); ! static Sock *SockSpoolPop(void); /* --- 114,121 ---- static int SockParse(Sock *sockPtr, int spooler); ! static int SockSpoolerPush(Sock *sockPtr); ! static Sock *SockSpoolerPop(void); ! ! static void SockWriterRelease(WriterSock *sockPtr, ReleaseReasons reason); /* *************** *** 133,136 **** --- 150,162 ---- static Sock *spoolerSockPtr = NULL; /* List of spooled Sock structures. */ + static SOCKET writerPipe[2]; /* Trigger to wakeup SpoolThread. */ + static Ns_Mutex writerLock; /* Lock around spooled list. */ + static Ns_Cond writerCond; /* Cond for stopped flag. */ + static Ns_Thread writerThread; /* Running SpoolThread. */ + static int writerStopped = 0; /* Flag to indicate writer thread stopped. */ + static int writerShutdown = 0; /* Flag to indicate shutdown. */ + static int writerDisabled = 1; /* Flag to enable/disable the upload writer. */ + static WriterSock *writerSockPtr = NULL; /* List of spooled Sock structures. */ + #define Push(x, xs) ((x)->nextPtr = (xs), (xs) = (x)) *************** *** 302,305 **** --- 328,332 ---- drvPtr->bufsize, drvPtr->maxinput); drvPtr->uploadsize = Ns_ConfigIntRange(path, "uploadsize", 2048, 1024, INT_MAX); + drvPtr->writersize = Ns_ConfigIntRange(path, "writer", 1024*1024, 1024*1024*10, INT_MAX); drvPtr->sndbuf = Ns_ConfigIntRange(path, "sndbuf", 0, 0, INT_MAX); drvPtr->rcvbuf = Ns_ConfigIntRange(path, "rcvbuf", 0, 0, INT_MAX); *************** *** 413,416 **** --- 440,447 ---- } + if (Ns_ConfigBool(path, "writer", NS_FALSE)) { + writerDisabled = 0; + } + return NS_OK; } *************** *** 489,492 **** --- 520,535 ---- Ns_ThreadCreate(SpoolThread, NULL, 0, &spoolerThread); } + + /* + * Create the writer thread. + */ + + if (writerDisabled == 0) { + if (ns_sockpair(writerPipe) != 0) { + Ns_Fatal("driver: ns_sockpair() failed: %s", + ns_sockstrerror(ns_sockerrno)); + } + Ns_ThreadCreate(WriterThread, NULL, 0, &writerThread); + } } *************** *** 834,838 **** if (readPtr == NULL && closePtr == NULL) { ! pollto = -1; } else { timeout.sec = INT_MAX; --- 877,881 ---- if (readPtr == NULL && closePtr == NULL) { ! pollto = 60 * 1000; } else { timeout.sec = INT_MAX; *************** *** 840,849 **** sockPtr = readPtr; while (sockPtr != NULL) { ! SockPoll(sockPtr, &pfds, &nfds, &maxfds, &timeout); sockPtr = sockPtr->nextPtr; } sockPtr = closePtr; while (sockPtr != NULL) { ! SockPoll(sockPtr, &pfds, &nfds, &maxfds, &timeout); sockPtr = sockPtr->nextPtr; } --- 883,892 ---- sockPtr = readPtr; while (sockPtr != NULL) { ! SockPoll(sockPtr, POLLIN, &pfds, &nfds, &maxfds, &timeout); sockPtr = sockPtr->nextPtr; } sockPtr = closePtr; while (sockPtr != NULL) { ! SockPoll(sockPtr, POLLIN, &pfds, &nfds, &maxfds, &timeout); sockPtr = sockPtr->nextPtr; } *************** *** 931,937 **** switch (n) { case SOCK_SPOOL: ! if (!SockSpoolPush(sockPtr)) { ! sockPtr->nextPtr = readPtr; ! readPtr = sockPtr; } break; --- 974,979 ---- switch (n) { case SOCK_SPOOL: ! if (!SockSpoolerPush(sockPtr)) { ! Push(sockPtr, readPtr); } break; *************** *** 965,970 **** while ((nextPtr = waitPtr) != NULL) { waitPtr = nextPtr->nextPtr; ! nextPtr->nextPtr = sockPtr; ! sockPtr = nextPtr; } --- 1007,1011 ---- while ((nextPtr = waitPtr) != NULL) { waitPtr = nextPtr->nextPtr; ! Push(nextPtr, sockPtr); } *************** *** 1190,1194 **** static void ! SockPoll(Sock *sockPtr, struct pollfd **pfds, unsigned int *nfds, unsigned int *maxfds, Ns_Time *timeoutPtr) { /* --- 1231,1235 ---- static void ! SockPoll(Sock *sockPtr, int type, struct pollfd **pfds, unsigned int *nfds, unsigned int *maxfds, Ns_Time *timeoutPtr) { /* *************** *** 1206,1210 **** (*pfds)[*nfds].fd = sockPtr->sock; ! (*pfds)[*nfds].events = POLLIN; (*pfds)[*nfds].revents = 0; sockPtr->pidx = (*nfds)++; --- 1247,1251 ---- (*pfds)[*nfds].fd = sockPtr->sock; ! (*pfds)[*nfds].events = type; (*pfds)[*nfds].revents = 0; sockPtr->pidx = (*nfds)++; *************** *** 1341,1348 **** --- 1382,1391 ---- switch (reason) { + case Reason_Close: case Reason_CloseTimeout: /* This is normal, never log. */ break; case Reason_ReadTimeout: + case Reason_WriteTimeout: /* * For this case, whether this is acceptable or not *************** *** 1901,1905 **** *---------------------------------------------------------------------- * ! * SpoolThread -- * * Spooling socket driver thread. --- 1944,1948 ---- *---------------------------------------------------------------------- * ! * SpoolerThread -- * * Spooling socket driver thread. *************** *** 1920,1926 **** SpoolThread(void *ignored) { ! char c; ! int n, stopping, pollto; ! Sock *sockPtr, *nextPtr, *waitPtr, *readPtr; Ns_Time timeout, now, diff; unsigned int nfds, maxfds; --- 1963,1969 ---- SpoolThread(void *ignored) { ! char c; ! int n, stopping, pollto; ! Sock *sockPtr, *nextPtr, *waitPtr, *readPtr; Ns_Time timeout, now, diff; unsigned int nfds, maxfds; *************** *** 1958,1962 **** if (readPtr == NULL) { ! pollto = -1; } else { timeout.sec = INT_MAX; --- 2001,2005 ---- if (readPtr == NULL) { ! pollto = 60 * 1000; } else { timeout.sec = INT_MAX; *************** *** 1964,1968 **** sockPtr = readPtr; while (sockPtr != NULL) { ! SockPoll(sockPtr, &pfds, &nfds, &maxfds, &timeout); sockPtr = sockPtr->nextPtr; } --- 2007,2011 ---- sockPtr = readPtr; while (sockPtr != NULL) { ! SockPoll(sockPtr, POLLIN, &pfds, &nfds, &maxfds, &timeout); sockPtr = sockPtr->nextPtr; } *************** *** 2067,2071 **** */ ! if (waitPtr == NULL && ((sockPtr = SockSpoolPop()))) { SockTimeout(sockPtr, &now, sockPtr->drvPtr->recvwait); Push(sockPtr, readPtr); --- 2110,2114 ---- */ ! if (waitPtr == NULL && ((sockPtr = SockSpoolerPop()))) { SockTimeout(sockPtr, &now, sockPtr->drvPtr->recvwait); Push(sockPtr, readPtr); *************** *** 2088,2092 **** static int ! SockSpoolPush(Sock *sockPtr) { int trigger = 0; --- 2131,2135 ---- static int ! SockSpoolerPush(Sock *sockPtr) { int trigger = 0; *************** *** 2109,2113 **** } ! Sock *SockSpoolPop(void) { Sock *sockPtr = 0; --- 2152,2156 ---- } ! Sock *SockSpoolerPop(void) { Sock *sockPtr = 0; *************** *** 2121,2122 **** --- 2164,2363 ---- return sockPtr; } + + /* + *---------------------------------------------------------------------- + * + * WriterThread -- + * + * Thread that writes files to clients + * + * Results: + * None. + * + * Side effects: + * Connections are accepted and their SockPtr is set to NULL + * so closing actual connection does not close the socket + * + *---------------------------------------------------------------------- + */ + + static void + WriterThread(void *ignored) + { + char c; + Ns_Time now; + int n, stopping, pollto, toread, nread, status; + WriterSock *sockPtr, *nextPtr, *writePtr; + struct pollfd pfds[1]; + char buf[2048]; + + Ns_ThreadSetName("-writer-"); + + /* + * Loop forever until signalled to shutdown and all + * connections are complete and gracefully closed. + */ + + Ns_Log(Notice, "writer: accepting connections"); + writePtr = NULL; + Ns_GetTime(&now); + stopping = 0; + pfds[0].fd = writerPipe[0]; + pfds[0].events = POLLIN; + + while (!stopping || nactive) { + + /* + * Select and drain the trigger pipe if necessary. + */ + + if (writePtr == NULL) { + pfds[0].revents = 0; + pollto = 30 * 1000; // Wake up every 30 seconds just in case + do { + n = poll(pfds, 1, pollto); + } while (n < 0 && errno == EINTR); + if (n < 0) { + Ns_Fatal("driver: poll() failed: %s", ns_sockstrerror(ns_sockerrno)); + } + if ((pfds[0].revents & POLLIN) && recv(writerPipe[0], &c, 1, 0) != 1) { + Ns_Fatal("driver: trigger recv() failed: %s", ns_sockstrerror(ns_sockerrno)); + } + } + + /* + * Attempt write to all available sockets + */ + + sockPtr = writePtr; + writePtr = NULL; + while (sockPtr != NULL) { + nextPtr = sockPtr->nextPtr; + + /* + * Read block from the file and send it to the socket + */ + + status = NS_OK; + if (sockPtr->nsend > 0) { + toread = sockPtr->nsend; + if (toread > sizeof(buf)) { + toread = sizeof(buf); + } + nread = read(sockPtr->fd, buf, (size_t)toread); + if (nread == -1) { + status = NS_ERROR; + } else if (nread == 0) { + sockPtr->nsend = 0; /* NB: Silently ignore a truncated file. */ + } else { + n = Ns_SockSend(sockPtr->sockPtr->sock, buf, nread, 0); + if (n == nread) { + sockPtr->nsend -= n; + } else { + status = NS_ERROR; + } + } + } + if (status != NS_OK) { + SockWriterRelease(sockPtr, Reason_SockError); + } else { + if (sockPtr->nsend > 0) { + Push(sockPtr, writePtr); + } else { + SockWriterRelease(sockPtr, 0); + } + } + sockPtr = nextPtr; + } + + /* + * Add more sockets to the writer queue + */ + + Ns_MutexLock(&writerLock); + sockPtr = writerSockPtr; + writerSockPtr = NULL; + while (sockPtr != NULL) { + nextPtr = sockPtr->nextPtr; + SockTimeout(sockPtr->sockPtr, &now, sockPtr->sockPtr->drvPtr->sendwait); + Push(sockPtr, writePtr); + sockPtr = nextPtr; + } + + /* + * Check for shutdown + */ + + stopping = writerShutdown; + Ns_MutexUnlock(&writerLock); + } + Ns_Log(Notice, "exiting"); + Ns_MutexLock(&writerLock); + writerStopped = 1; + Ns_CondBroadcast(&writerCond); + Ns_MutexUnlock(&writerLock); + } + + static void + SockWriterRelease(WriterSock *sockPtr, ReleaseReasons reason) + { + Ns_Log(Notice, "Writer: stop fd=%d", sockPtr->fd); + SockRelease(sockPtr->sockPtr, reason); + close(sockPtr->fd); + ns_free(sockPtr); + } + + int + NsQueueWriter(Ns_Conn *conn, int nsend, Tcl_Channel chan, FILE *fp, int fd) + { + Conn *connPtr = (Conn*)conn; + WriterSock *sockPtr; + int trigger = 0; + + if (writerDisabled || + nsend < connPtr->drvPtr->writersize || + (conn->flags & NS_CONN_WRITE_CHUNKED)) { + return NS_ERROR; + } + + /* + * Flush the headers + */ + + Ns_WriteConn(conn, NULL, 0); + + sockPtr = (WriterSock*)ns_calloc(1, sizeof(WriterSock)); + sockPtr->sockPtr = connPtr->sockPtr; + sockPtr->flags = connPtr->flags; + if (chan != NULL) { + if (Tcl_GetChannelHandle(chan, TCL_READABLE, (ClientData)&sockPtr->fd) != TCL_OK) { + ns_free(sockPtr); + return NS_ERROR; + } + } else if (fp != NULL) { + sockPtr->fd = fileno(fp); + } + sockPtr->fd = ns_sockdup(sockPtr->fd); + sockPtr->nsend = nsend; + connPtr->sockPtr = NULL; + // To keep nslog happy about content size sent + connPtr->nContentSent = nsend; + Ns_SockSetBlocking(sockPtr->sockPtr->sock); + Ns_Log(Notice, "Writer: start fd=%d: %d bytes: %s", sockPtr->fd, nsend, connPtr->reqPtr->request->url); + + Ns_MutexLock(&writerLock); + if (writerSockPtr == NULL) { + trigger = 1; + } + Push(sockPtr, writerSockPtr); + Ns_MutexUnlock(&writerLock); + + /* + * Wake up writer thread + */ + + if (trigger) { + SockTrigger(writerPipe[1]); + } + return NS_OK; + } Index: nsd.h =================================================================== RCS file: /cvsroot/naviserver/naviserver/nsd/nsd.h,v retrieving revision 1.44 retrieving revision 1.45 diff -C2 -d -r1.44 -r1.45 *** nsd.h 12 Jan 2006 01:17:57 -0000 1.44 --- nsd.h 16 Jan 2006 04:29:57 -0000 1.45 *************** *** 362,365 **** --- 362,366 ---- int readahead; /* Maximum request size in memory. */ int uploadsize; /* Minimum upload size for statistics tracking. */ + int writersize; /* Maximum content size when to use writer thread. */ unsigned int loggingFlags; /* Logging control flags */ *************** *** 863,866 **** --- 864,869 ---- extern void NsFreeRequest(Request *reqPtr); + extern int NsQueueWriter(Ns_Conn *conn, int nsend, Tcl_Channel chan, FILE *fp, int fd); + extern NsServer *NsGetServer(CONST char *server); extern NsServer *NsGetInitServer(void); *************** *** 949,953 **** extern void NsStartTaskQueueShutdown(void); extern void NsWaitTaskQueueShutdown(Ns_Time *toPtr); - extern void NsStartJobsShutdown(void); extern void NsWaitJobsShutdown(Ns_Time *toPtr); --- 952,955 ---- Index: connio.c =================================================================== RCS file: /cvsroot/naviserver/naviserver/nsd/connio.c,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** connio.c 2 Oct 2005 22:23:09 -0000 1.9 --- connio.c 16 Jan 2006 04:29:57 -0000 1.10 *************** *** 846,850 **** Ns_WriteConn(conn, NULL, 0); } ! status = NS_OK; while (status == NS_OK && nsend > 0) { --- 846,858 ---- Ns_WriteConn(conn, NULL, 0); } ! ! /* ! * Check for submision into writer queue ! */ ! ! if (NsQueueWriter(conn, nsend, chan, fp, fd) == NS_OK) { ! return NS_OK; ! } ! status = NS_OK; while (status == NS_OK && nsend > 0) { |