[srvx-commits] CVS: services/src ioset.c,1.8,1.9 ioset.h,1.4,1.5 proto-common.c,1.12,1.13
Brought to you by:
entrope
|
From: Entrope <en...@us...> - 2002-09-05 14:55:51
|
Update of /cvsroot/srvx/services/src
In directory usw-pr-cvs1:/tmp/cvs-serv552/src
Modified Files:
ioset.c ioset.h proto-common.c
Log Message:
add line-at-a-time read support to ioset fds; use it
Index: ioset.c
===================================================================
RCS file: /cvsroot/srvx/services/src/ioset.c,v
retrieving revision 1.8
retrieving revision 1.9
diff -C2 -r1.8 -r1.9
*** ioset.c 14 Aug 2002 03:03:58 -0000 1.8
--- ioset.c 5 Sep 2002 14:55:48 -0000 1.9
***************
*** 37,40 ****
--- 37,42 ----
#endif
+ #define IS_EOL(CH) ((CH) == '\n')
+
extern int uplink_connect(void);
int clock_skew;
***************
*** 44,54 ****
static fd_set read_fds, write_fds;
! void ioset_cleanup(void)
! {
free(fds);
}
! struct io_fd *ioset_add(int fd)
! {
struct io_fd *res;
int flags;
--- 46,101 ----
static fd_set read_fds, write_fds;
! static void
! ioq_init(struct ioq *ioq, int size) {
! ioq->buf = malloc(size);
! ioq->get = ioq->put = 0;
! ioq->size = size;
! }
!
! static unsigned int
! ioq_put_avail(const struct ioq *ioq) {
! /* Subtract 1 from ioq->get to be sure we don't fill the buffer
! * and make it look empty even when there's data in it. */
! if (ioq->put < ioq->get) {
! return ioq->get - ioq->put - 1;
! } else if (ioq->get == 0) {
! return ioq->size - ioq->put - 1;
! } else {
! return ioq->size - ioq->put;
! }
! }
!
! static unsigned int
! ioq_get_avail(const struct ioq *ioq) {
! return ((ioq->put < ioq->get) ? ioq->size : ioq->put) - ioq->get;
! }
!
! static unsigned int
! ioq_used(const struct ioq *ioq) {
! return ((ioq->put < ioq->get) ? ioq->size : 0) + ioq->put - ioq->get;
! }
!
! static unsigned int
! ioq_grow(struct ioq *ioq) {
! int new_size = ioq->size << 1;
! char *new_buf = malloc(new_size);
! int get_avail = ioq_get_avail(ioq);
! memcpy(new_buf, ioq->buf + ioq->get, get_avail);
! if (ioq->put < ioq->get) memcpy(new_buf + get_avail, ioq->buf, ioq->put);
! free(ioq->buf);
! ioq->put = ioq_used(ioq);
! ioq->get = 0;
! ioq->buf = new_buf;
! ioq->size = new_size;
! return new_size - ioq->put;
! }
!
! void
! ioset_cleanup(void) {
free(fds);
}
! struct io_fd *
! ioset_add(int fd) {
struct io_fd *res;
int flags;
***************
*** 60,70 ****
res = calloc(1, sizeof(*res));
if (!res) return 0;
- res->send_size = 1024;
- res->send = malloc(res->send_size);
- if (!res->send) {
- free(res->send);
- return 0;
- }
res->fd = fd;
if ((unsigned)fd >= fds_size) {
unsigned int old_size = fds_size;
--- 107,113 ----
res = calloc(1, sizeof(*res));
if (!res) return 0;
res->fd = fd;
+ ioq_init(&res->send, 1024);
+ ioq_init(&res->recv, 1024);
if ((unsigned)fd >= fds_size) {
unsigned int old_size = fds_size;
***************
*** 80,85 ****
struct io_fd *
! ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void (*connect_cb)(struct io_fd *fd))
! {
int fd, res;
struct io_fd *io_fd;
--- 123,127 ----
struct io_fd *
! ioset_connect(struct sockaddr *local, unsigned int sa_size, const char *peer, unsigned int port, int blocking, void (*connect_cb)(struct io_fd *fd)) {
int fd, res;
struct io_fd *io_fd;
***************
*** 119,122 ****
--- 161,165 ----
return NULL;
}
+ io_fd->connect_cb = connect_cb;
if (res < 0) {
switch (errno) {
***************
*** 137,153 ****
static void
! ioset_try_write(struct io_fd *fd)
! {
int res;
! unsigned int req = fd->send_used;
! if (req > (fd->send_size-fd->send_pos)) req = fd->send_size - fd->send_pos;
! if (IOSET_DEBUG) {
! log(DEBUG_LOG, LOG_INFO, "write()'ing to %d: send_pos=%d send_used=%d send_size=%d req=%d\n",
! fd->send_pos, fd->send_used, fd->send_size, req);
! }
! res = write(fd->fd, fd->send+fd->send_pos, req);
! if (IOSET_DEBUG) {
! log(DEBUG_LOG, LOG_INFO, "ioset write(%d, \"%.*s\") -> %d\n", fd->fd, req, fd->send+fd->send_pos, res);
! }
if (res < 0) {
switch (errno) {
--- 180,187 ----
static void
! ioset_try_write(struct io_fd *fd) {
int res;
! unsigned int req = ioq_get_avail(&fd->send);
! res = write(fd->fd, fd->send.buf+fd->send.get, req);
if (res < 0) {
switch (errno) {
***************
*** 157,180 ****
}
} else {
! fd->send_pos += res;
! if (fd->send_pos == fd->send_size) fd->send_pos = 0;
! fd->send_used -= res;
}
}
! void ioset_close(int fd, int os_close)
! {
struct io_fd *fdp;
if (!(fdp = fds[fd])) return;
fds[fd] = NULL;
if (fdp->destroy_cb) fdp->destroy_cb(fdp);
! if (fdp->send_used > 0) {
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
ioset_try_write(fdp);
/* it may need to send the beginning of the buffer now.. */
! if (fdp->send_used > 0) ioset_try_write(fdp);
}
! if (fdp->send) free(fdp->send);
if (os_close) close(fd);
free(fdp);
--- 191,214 ----
}
} else {
! fd->send.get += res;
! if (fd->send.get == fd->send.size) fd->send.get = 0;
}
}
! void
! ioset_close(int fd, int os_close) {
struct io_fd *fdp;
if (!(fdp = fds[fd])) return;
fds[fd] = NULL;
if (fdp->destroy_cb) fdp->destroy_cb(fdp);
! if (fdp->send.get != fdp->send.put) {
int flags = fcntl(fd, F_GETFL);
fcntl(fd, F_SETFL, flags&~O_NONBLOCK);
ioset_try_write(fdp);
/* it may need to send the beginning of the buffer now.. */
! if (fdp->send.get != fdp->send.put) ioset_try_write(fdp);
}
! if (fdp->send.buf) free(fdp->send.buf);
! if (fdp->recv.buf) free(fdp->recv.buf);
if (os_close) close(fd);
free(fdp);
***************
*** 183,188 ****
}
! void ioset_run(void)
! {
extern struct io_fd *socket_io_fd;
struct timeval select_timeout;
--- 217,324 ----
}
! static int
! ioset_find_line_length(struct io_fd *fd) {
! unsigned int pos, max, len;
! len = 0;
! max = (fd->recv.put < fd->recv.get) ? fd->recv.size : fd->recv.put;
! for (pos = fd->recv.get; pos < max; ++pos, ++len) {
! if (IS_EOL(fd->recv.buf[pos])) return fd->line_len = len + 1;
! }
! if (fd->recv.put < fd->recv.get) {
! for (pos = 0; pos < fd->recv.put; ++pos, ++len) {
! if (IS_EOL(fd->recv.buf[pos])) return fd->line_len = len + 1;
! }
! }
! return fd->line_len = 0;
! }
!
! static void
! ioset_buffered_read(struct io_fd *fd) {
! int put_avail, nbr, fdnum;
!
! if (!(put_avail = ioq_put_avail(&fd->recv))) put_avail = ioq_grow(&fd->recv);
! nbr = read(fd->fd, fd->recv.buf + fd->recv.put, put_avail);
! if (nbr < 0) {
! log(MAIN_LOG, LOG_ERROR, "Unexpected read() error %d on fd %d: %s\n", errno, fd->fd, strerror(errno));
! } else if (nbr == 0) {
! fd->eof = 1;
! fd->wants_reads = 0;
! } else {
! if (fd->line_len == 0) {
! unsigned int pos;
! for (pos = fd->recv.put; pos < fd->recv.put + nbr; ++pos) {
! if (IS_EOL(fd->recv.buf[pos])) {
! if (fd->recv.put < fd->recv.get) {
! fd->line_len = fd->recv.size + pos + 1 - fd->recv.get;
! } else {
! fd->line_len = pos + 1 - fd->recv.get;
! }
! break;
! }
! }
! }
! fd->recv.put += nbr;
! if (fd->recv.put == fd->recv.size) fd->recv.put = 0;
! fdnum = fd->fd;
! while (fd->wants_reads && (fd->line_len > 0)) {
! fd->readable_cb(fd);
! if (!fds[fdnum]) break; /* make sure they didn't close on us */
! ioset_find_line_length(fd);
! }
! }
! }
!
! int
! ioset_line_read(struct io_fd *fd, char *dest, int max) {
! int avail, done;
! if (fd->eof && (!ioq_get_avail(&fd->recv) || (fd->line_len < 0))) return 0;
! if (fd->line_len < 0) return -1;
! if (fd->line_len < max) max = fd->line_len;
! avail = ioq_get_avail(&fd->recv);
! if (max > avail) {
! memcpy(dest, fd->recv.buf + fd->recv.get, avail);
! fd->recv.get += avail;
! assert(fd->recv.get == fd->recv.size);
! fd->recv.get = 0;
! done = avail;
! } else {
! done = 0;
! }
! memcpy(dest + done, fd->recv.buf + fd->recv.get, max - done);
! fd->recv.get += max - done;
! if (fd->recv.get == fd->recv.size) fd->recv.get = 0;
! dest[max] = 0;
! ioset_find_line_length(fd);
! return max;
! }
!
! #if 1
! #define debug_fdsets(MSG, NFDS, READ_FDS, WRITE_FDS, EXCEPT_FDS, SELECT_TIMEOUT) (void)0
! #else
! static void
! debug_fdsets(const char *msg, int nfds, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, struct timeval *select_timeout) {
! static const char *flag_text[8] = { "---", "r", "w", "rw", "e", "er", "ew", "erw" };
! char buf[MAXLEN];
! int pos, ii, flags;
! struct timeval now;
!
! for (pos=ii=0; ii<nfds; ++ii) {
! flags = (read_fds && FD_ISSET(ii, read_fds)) ? 1 : 0;
! flags |= (write_fds && FD_ISSET(ii, write_fds)) ? 2 : 0;
! flags |= (except_fds && FD_ISSET(ii, except_fds)) ? 4 : 0;
! if (!flags) continue;
! pos += sprintf(buf+pos, " %d%s", ii, flag_text[flags]);
! }
! gettimeofday(&now, NULL);
! if (select_timeout) {
! log(DEBUG_LOG, LOG_OTHER, "%s, at %u.%06u:%s (timeout %u.%06u)\n", msg, now.tv_sec, now.tv_usec, buf, select_timeout->tv_sec, select_timeout->tv_usec);
! } else {
! log(DEBUG_LOG, LOG_OTHER, "%s, at %u.%06u:%s (no timeout)\n", msg, now.tv_sec, now.tv_usec, buf);
! }
! }
! #endif
!
! void
! ioset_run(void) {
extern struct io_fd *socket_io_fd;
struct timeval select_timeout;
***************
*** 212,220 ****
max_fd = nn;
if (fd->wants_reads) FD_SET(nn, &read_fds);
! if ((fd->send_used > 0) || !fd->connected) FD_SET(nn, &write_fds);
}
/* Check for activity, update time. */
select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
now = time(NULL) + clock_skew;
if (select_result < 0) {
--- 348,358 ----
max_fd = nn;
if (fd->wants_reads) FD_SET(nn, &read_fds);
! if ((fd->send.get != fd->send.put) || !fd->connected) FD_SET(nn, &write_fds);
}
/* Check for activity, update time. */
+ debug_fdsets("Entering select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
select_result = select(max_fd + 1, &read_fds, &write_fds, NULL, &select_timeout);
+ debug_fdsets("After select", max_fd+1, &read_fds, &write_fds, NULL, &select_timeout);
now = time(NULL) + clock_skew;
if (select_result < 0) {
***************
*** 230,234 ****
if (!(fd = fds[nn])) continue;
if (FD_ISSET(nn, &read_fds)) {
! fd->readable_cb(fd);
}
if (FD_ISSET(nn, &write_fds) && !fd->connected) {
--- 368,376 ----
if (!(fd = fds[nn])) continue;
if (FD_ISSET(nn, &read_fds)) {
! if (fd->line_reads) {
! ioset_buffered_read(fd);
! } else {
! fd->readable_cb(fd);
! }
}
if (FD_ISSET(nn, &write_fds) && !fd->connected) {
***************
*** 240,244 ****
* a free()'d pointer for the fd.
*/
! if (FD_ISSET(nn, &write_fds) && (fd->send_used > 0)) ioset_try_write(fd);
}
--- 382,388 ----
* a free()'d pointer for the fd.
*/
! if (FD_ISSET(nn, &write_fds) && (fd->send.get != fd->send.put)) {
! ioset_try_write(fd);
! }
}
***************
*** 249,293 ****
void
! ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw)
! {
! /* Four cases to deal with here.. */
! if (fd->send_used + nbw > fd->send_size) {
! /* Case 1: Must allocate more space. */
! unsigned int new_size = fd->send_size << 1;
! char *new_buf;
! if (nbw + fd->send_used > new_size) new_size = nbw + fd->send_used;
! new_buf = malloc(new_size);
! if (fd->send_pos + fd->send_used < fd->send_size) {
! if (IOSET_DEBUG) {
! log(DEBUG_LOG, LOG_INFO, "One-part copy: (%d, %d)->(0,%d)\n",
! fd->send_pos, fd->send_pos+fd->send_used, fd->send_used);
! }
! memcpy(new_buf, fd->send+fd->send_pos, fd->send_used);
! } else {
! if (IOSET_DEBUG) {
! log(DEBUG_LOG, LOG_INFO, "Two-part copy: (%d, %d)->(0,%d) and (0,%d)->(%d,%d)\n",
! fd->send_pos, fd->send_size, fd->send_size-fd->send_pos,
! fd->send_used-(fd->send_size-fd->send_pos), fd->send_size-fd->send_pos, fd->send_used);
! }
! memcpy(new_buf, fd->send+fd->send_pos, fd->send_size-fd->send_pos);
! memcpy(new_buf+fd->send_size-fd->send_pos, fd->send, fd->send_used-(fd->send_size-fd->send_pos));
! }
! memcpy(new_buf+fd->send_used, buf, nbw);
! free(fd->send);
! fd->send = new_buf;
! fd->send_size = new_size;
! fd->send_pos = 0;
! } else if (fd->send_pos + fd->send_used >= fd->send_size) {
! /* Case 2: Can fit in middle of buffer. */
! memcpy(fd->send + fd->send_pos + fd->send_used - fd->send_size, buf, nbw);
! } else if (fd->send_pos + fd->send_used + nbw < fd->send_size) {
! /* Case 3: Can fit before end of buffer. Assumes fd->send_pos + fd->send_used < fd->send_size. */
! memcpy(fd->send + fd->send_pos + fd->send_used, buf, nbw);
! } else {
! /* Case 4: Must wrap around end of buffer. */
! int first = fd->send_size - fd->send_pos - fd->send_used;
! memcpy(fd->send + fd->send_pos + fd->send_used, buf, first);
! memcpy(fd->send, buf+first, nbw-first);
! }
! fd->send_used += nbw;
}
--- 393,410 ----
void
! ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw) {
! unsigned int avail;
! while (ioq_used(&fd->send) + nbw >= fd->send.size) {
! ioq_grow(&fd->send);
! }
! avail = ioq_put_avail(&fd->send);
! if (nbw > avail) {
! memcpy(fd->send.buf + fd->send.put, buf, avail);
! buf += avail;
! nbw -= avail;
! fd->send.put = 0;
! }
! memcpy(fd->send.buf + fd->send.put, buf, nbw);
! fd->send.put += nbw;
! if (fd->send.put == fd->send.size) fd->send.put = 0;
}
Index: ioset.h
===================================================================
RCS file: /cvsroot/srvx/services/src/ioset.h,v
retrieving revision 1.4
retrieving revision 1.5
diff -C2 -r1.4 -r1.5
*** ioset.h 14 Aug 2002 03:03:58 -0000 1.4
--- ioset.h 5 Sep 2002 14:55:48 -0000 1.5
***************
*** 25,35 ****
struct sockaddr;
struct io_fd {
int fd;
void *data;
- unsigned int send_size, send_used, send_pos;
- unsigned char *send;
unsigned int connected : 1;
unsigned int wants_reads : 1;
void (*connect_cb)(struct io_fd *fd);
void (*readable_cb)(struct io_fd *fd);
--- 25,43 ----
struct sockaddr;
+ struct ioq {
+ unsigned char *buf;
+ unsigned int size, get, put;
+ };
+
struct io_fd {
int fd;
void *data;
unsigned int connected : 1;
unsigned int wants_reads : 1;
+ unsigned int line_reads : 1;
+ unsigned int eof : 1;
+ int line_len;
+ struct ioq send;
+ struct ioq recv;
void (*connect_cb)(struct io_fd *fd);
void (*readable_cb)(struct io_fd *fd);
***************
*** 43,46 ****
--- 51,55 ----
void ioset_run(void);
void ioset_write(struct io_fd *fd, const char *buf, unsigned int nbw);
+ int ioset_line_read(struct io_fd *fd, char *buf, int maxlen);
void ioset_close(int fd, int os_close);
void ioset_cleanup(void);
Index: proto-common.c
===================================================================
RCS file: /cvsroot/srvx/services/src/proto-common.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -C2 -r1.12 -r1.13
*** proto-common.c 23 Aug 2002 04:49:39 -0000 1.12
--- proto-common.c 5 Sep 2002 14:55:48 -0000 1.13
***************
*** 53,57 ****
static int replay_read(void);
- static int read_from_socket(void);
static dict_t irc_func_dict;
--- 53,56 ----
***************
*** 62,72 ****
static void parse_foreach(char *target_list, foreach_chanfunc cf, foreach_nonchan nc, foreach_userfunc uf, foreach_nonuser nu, void *data);
! static void uplink_readable(struct io_fd *fd)
! {
! (void)fd;
! if (!read_from_socket()) {
log(MAIN_LOG, LOG_ERROR, "Connection to server lost.\n");
close_socket();
}
}
--- 61,80 ----
static void parse_foreach(char *target_list, foreach_chanfunc cf, foreach_nonchan nc, foreach_userfunc uf, foreach_nonuser nu, void *data);
! static void
! uplink_readable(struct io_fd *fd) {
! static char buffer[MAXLEN];
! char *eol;
! int pos;
!
! pos = ioset_line_read(fd, buffer, sizeof(buffer));
! if (pos < 0) {
log(MAIN_LOG, LOG_ERROR, "Connection to server lost.\n");
close_socket();
+ return;
}
+ if ((eol = strpbrk(buffer, "\r\n"))) *eol = 0;
+ log(DEBUG_LOG, LOG_INFO, " %s\n", buffer);
+ if (cManager.uplink->state != DISCONNECTED) parse_line(buffer, 0);
+ lines_processed++;
}
***************
*** 111,114 ****
--- 119,123 ----
}
socket_io_fd->readable_cb = uplink_readable;
+ socket_io_fd->line_reads = 1;
socket_io_fd->wants_reads = 1;
log(MAIN_LOG, LOG_INFO, "Connection to server established.\n");
***************
*** 186,224 ****
parse_line(read_line, 0);
lines_processed++;
- return 1;
- }
-
- int
- read_from_socket(void)
- {
- static char in[MAXLEN];
- static int used = 0;
- static char *buffer;
- int i, recv_len;
-
- recv_len = read(socket_io_fd->fd, in+used, sizeof(in)-used);
- if (recv_len < 0) {
- /* if it would have blocked, pretend we succeeded */
- if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) return 1;
- return 0;
- }
- if (!recv_len) {
- /* end of file encountered */
- cManager.uplink->state = DISCONNECTED;
- return 0;
- }
- buffer = in;
- for (i=0; i<recv_len+used; i++) {
- if (in[i] == '\r') in[i] = 0;
- if (in[i] == '\n') {
- in[i] = 0;
- log(DEBUG_LOG, LOG_INFO, " %s\n", buffer);
- if (cManager.uplink->state != DISCONNECTED) parse_line(buffer, 0);
- buffer = in + i + 1;
- lines_processed++;
- }
- }
- used = (used + recv_len) - (buffer - in);
- memmove(in, buffer, used);
return 1;
}
--- 195,198 ----
|