Thread: [Assorted-commits] SF.net SVN: assorted:[1009] netio-bench/trunk/src
Brought to you by:
yangzhang
|
From: <yan...@us...> - 2008-10-08 22:07:26
|
Revision: 1009
http://assorted.svn.sourceforge.net/assorted/?rev=1009&view=rev
Author: yangzhang
Date: 2008-10-08 22:07:21 +0000 (Wed, 08 Oct 2008)
Log Message:
-----------
made the benchmark into an echo client/server
Modified Paths:
--------------
netio-bench/trunk/src/Makefile
netio-bench/trunk/src/epoll.cc
Modified: netio-bench/trunk/src/Makefile
===================================================================
--- netio-bench/trunk/src/Makefile 2008-10-08 22:05:15 UTC (rev 1008)
+++ netio-bench/trunk/src/Makefile 2008-10-08 22:07:21 UTC (rev 1009)
@@ -1,9 +1,14 @@
-all: epoll
+all: run
+run:
+ rm -f epoll
+ g++ -I../../../cpp-commons/trunk/src/ -Wall -O3 -o epoll epoll.cc
+ ./epoll
+
epoll: epoll.cc
g++ -I../../../cpp-commons/trunk/src/ -Wall -O3 -o $@ $<
clean:
rm -f epoll
-.PHONY: clean
+.PHONY: clean run
Modified: netio-bench/trunk/src/epoll.cc
===================================================================
--- netio-bench/trunk/src/epoll.cc 2008-10-08 22:05:15 UTC (rev 1008)
+++ netio-bench/trunk/src/epoll.cc 2008-10-08 22:07:21 UTC (rev 1009)
@@ -7,97 +7,165 @@
#include <commons/check.h>
#include <commons/closing.h>
+#include <commons/pool.h>
#include <commons/sockets.h>
+#include <iostream>
using namespace commons;
using namespace std;
-/**
- * Read data from the given file descriptor until we would block (EAGAIN) or we
- * hit EOF/an error.
- * \return true if we hit EAGAIN, false on EOF or unexpected error.
- */
-static bool
-consume(int fd) {
- while (true) {
- char buf[1024];
- int bytes = read(fd, buf, sizeof buf);
- if (bytes == -1) {
- if (errno == EAGAIN) {
- return true;
- } else {
- perror("read");
- return false;
+class echoer {
+ public:
+ /**
+ * \return true iff we are not done with the reading/would've blocked
+ * (EAGAIN), false iff we've gotten the full 40-byte packet or have hit
+ * EOF/an error.
+ */
+ bool consume() {
+ while (true) {
+ char buf[1024];
+ int bytes = ::read(fd_, buf, sizeof buf);
+ if (bytes == -1) {
+ // We're going to block.
+ if (errno == EAGAIN) {
+ return true;
+ } else {
+ perror("read");
+ return false;
+ }
+ }
+ if (bytes == 0) {
+ return false;
+ }
+ ss_ << string(buf, bytes);
+ if (ss_.tellp() >= 10)
+ return false;
}
}
- if (bytes == 0) {
- return false;
- }
- // Write the data to stdout
- checknneg(write(1, buf, bytes) == -1);
- }
-}
+ /**
+ * Read the contents of the buffer as a string.
+ */
+ string read() { return ss_.str(); }
+ /**
+ * The socket file descriptor we're currently associated with.
+ */
+ int & fd() { return fd_; }
+ int fd() const { return fd_; }
+
+ private:
+ stringstream ss_;
+ int fd_;
+};
+
int
main(int argc, char* argv[]) {
+ // Create a non-blocking server socket.
int server = tcp_listen(8080, true);
// Make sure the fd is finally closed.
closingfd closer(server);
- // Create our epoll file descriptor
- const int max_events = 16;
+ // Create our epoll file descriptor. max_events is the maximum number of
+ // events to process at a time (max number of events that we want a call to
+ // epoll_wait() to "return"), while max_echoers is the max number of
+ // connections to make.
+ const int max_events = 16, max_echoers = 100;
+
+ // This file descriptor isn't actually bound to any socket; it's a special fd
+ // that is really just used for manipulating the epoll (e.g., registering
+ // more sockets/connections with it). TODO: Figure out the rationale behind
+ // why this thing is an fd.
int epoll_fd = checknneg(epoll_create(max_events));
- // Add our server fd to the epoll event loop
+ // Add our server fd to the epoll event loop. The event specifies:
+ //
+ // - what fd is
+ // - what operations we're interested in (connections, hangups, errors)
+ // (TODO: what are hangups?)
+ // - arbitrary data to be associated with this fd, in the form of a pointer
+ // (ptr) or number (u32/u64); this is more useful for connection fd's, of
+ // which there are multiple, and so it helps to have a direct pointer to
+ // (say) that connection's handler.
+ //
+ // The add operation actually makes a copy of the given epoll_event, so
+ // that's why we can reuse this `event` later.
struct epoll_event event;
event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
event.data.fd = server;
checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server, &event));
- // Execute the epoll event loop
+ // Set up a bunch of echo server instances.
+ pool<echoer> echoers(max_echoers);
+
+ // Execute the epoll event loop.
while (true) {
struct epoll_event events[max_events];
int num_fds = epoll_wait(epoll_fd, events, max_events, -1);
for (int i = 0; i < num_fds; i++) {
- // Case 1: Error condition
+ // Case 1: Error condition.
if (events[i].events & (EPOLLHUP | EPOLLERR)) {
fputs("epoll: EPOLLERR", stderr);
+ // epoll will remove the fd from its set automatically when the fd is
+ // closed.
close(events[i].data.fd);
- continue;
- }
- check(events[i].events & EPOLLIN);
+ } else {
+ check(events[i].events & EPOLLIN);
- // Case 2: Our server is receiving a connection
- if (events[i].data.fd == server) {
- struct sockaddr remote_addr;
- socklen_t addr_size = sizeof(remote_addr);
- int connection = accept(server, &remote_addr, &addr_size);
- if (connection == -1) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- perror("accept");
+ // Case 2: Our server is receiving a connection.
+ if (events[i].data.fd == server) {
+ struct sockaddr remote_addr;
+ socklen_t addr_size = sizeof remote_addr;
+ int connection = accept(server, &remote_addr, &addr_size);
+ if (connection == -1) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ perror("accept");
+ }
+ continue;
}
- continue;
+
+ // Make the connection non-blocking.
+ checknneg(fcntl(connection, F_SETFL,
+ O_NONBLOCK | fcntl(connection, F_GETFL, 0)));
+
+ // Add the connection to our epoll loop. Note we are reusing our
+ // epoll_event. Now we're actually using the ptr field to point to a
+ // free handler. event.data is a union of {ptr, fd, ...}, so we can
+ // only use one of these. event.data is entirely for the user; epoll
+ // doesn't actually look at this. Note that we're passing the fd
+ // (connection) separately into epoll_ctl().
+ echoer *e = echoers.take();
+ cout << "got a connection! " <<
+ echoers.size() << " echoers remaining" << endl;
+ event.data.ptr = e;
+ e->fd() = connection;
+ checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connection,
+ &event));
}
- // Make the connection non-blocking
- checknneg(fcntl(connection, F_SETFL,
- O_NONBLOCK | fcntl(connection, F_GETFL, 0)));
+ // Case 3: One of our connections has read data.
+ else {
+ echoer &e = *((echoer*) events[i].data.ptr);
+ // If we have read the minimum amount (or encountered a dead-end
+ // situation), then echo the data back.
+ if (!e.consume()) {
+ // Write back!
+ string s = e.read();
+ check((size_t) checknneg(write(e.fd(), s.c_str(), s.size())) == s.size());
- // Add the connection to our epoll loop
- event.data.fd = connection;
- checknneg(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, connection,
- &event));
- continue;
- }
+ // epoll will remove the fd from its set automatically when the fd is
+ // closed.
+ close(e.fd());
- // Case 3: One of our connections has read data
- if (!consume(events[i].data.fd)) {
- // epoll will remove the fd from its set
- // automatically when the fd is closed
- close(events[i].data.fd);
+ // Release the echoer.
+ echoers.drop(&e);
+
+ cout << "responded with '" << e.read() << "'; " <<
+ echoers.size() << " echoers remaining" << endl;
+ }
+ }
}
}
}
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|