[Assorted-commits] SF.net SVN: assorted: [322] udp-pinger
Brought to you by:
yangzhang
From: <yan...@us...> - 2008-02-04 19:22:16
|
Revision: 322 http://assorted.svn.sourceforge.net/assorted/?rev=322&view=rev Author: yangzhang Date: 2008-02-04 11:22:18 -0800 (Mon, 04 Feb 2008) Log Message: ----------- initial import of udp-pinger! Added Paths: ----------- udp-pinger/ udp-pinger/trunk/ udp-pinger/trunk/Makefile udp-pinger/trunk/TODO udp-pinger/trunk/src/ udp-pinger/trunk/src/pinger.cc Added: udp-pinger/trunk/Makefile =================================================================== --- udp-pinger/trunk/Makefile (rev 0) +++ udp-pinger/trunk/Makefile 2008-02-04 19:22:18 UTC (rev 322) @@ -0,0 +1,15 @@ +all: pinger + +pinger: tpinger.cc + g++ -pthread -static -static-libgcc -O3 -Wall -pipe -o pinger \ + -Iboost_asio_0_3_8rc3 \ + -I/opt/boost-head-2007.03.25/include \ + -L/opt/boost-head-2007.03.25/lib \ + -lboost_thread-gcc41-mt \ + -lboost_system-gcc41-mt \ + -lboost_serialization-gcc41-mt \ + tpinger.cc \ + -lboost_thread-gcc41-mt \ + -lboost_system-gcc41-mt \ + -lboost_serialization-gcc41-mt + strip pinger Added: udp-pinger/trunk/TODO =================================================================== --- udp-pinger/trunk/TODO (rev 0) +++ udp-pinger/trunk/TODO 2008-02-04 19:22:18 UTC (rev 322) @@ -0,0 +1 @@ +- replace Makefile with autotools Added: udp-pinger/trunk/src/pinger.cc =================================================================== --- udp-pinger/trunk/src/pinger.cc (rev 0) +++ udp-pinger/trunk/src/pinger.cc 2008-02-04 19:22:18 UTC (rev 322) @@ -0,0 +1,249 @@ +// vim:et:sw=2:ts=2 + +#define ntohll(x) (((_int64)(ntohl((int)((x << 32) >> 32))) << 32) | \ + (unsigned int)ntohl(((int)(x >> 32)))) +#define htonll(x) ntohll(x) + +#include <fstream> +#include <iostream> +#include <string> +#include <sstream> +#include <boost/array.hpp> +#include <boost/asio.hpp> +#include <boost/bind.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/xtime.hpp> + +#include <sys/time.h> + +#include <boost/archive/binary_iarchive.hpp> +#include <boost/archive/binary_oarchive.hpp> + +int my_nid = -1; +std::string default_port = "55555"; +boost::asio::io_service io_service; + +long long currentTimeMillis() { + using namespace std; + long long t; + struct timeval tv; + + gettimeofday(&tv, 0); + + t = tv.tv_sec; + t = (t *1000) + (tv.tv_usec/1000); + + return t; +} + +using boost::asio::ip::udp; +using namespace boost; + +int sender(std::vector<udp::endpoint> endpoints) +{ + using namespace std; + + try + { + udp::socket socket(io_service); + socket.open(udp::v4()); + + boost::xtime xt; + boost::xtime_get(&xt, boost::TIME_UTC); + for (int seqno = 0; true; seqno++) { + xt.sec += 10; + boost::thread::sleep(xt); + + using namespace std; + using namespace boost::archive; + + for (size_t i = 0; i < endpoints.size(); i++) { + int dst = i + 1; + ostringstream ss; + binary_oarchive oa(ss); + long long time = currentTimeMillis(); + oa & my_nid & dst & seqno & time; + string s = ss.str(); + vector<char> v(s.begin(), s.end()); + // cout << "serialized buffer of size " << s.size() << endl; + socket.send_to(boost::asio::buffer(v), endpoints[i]); + } + } + +// boost::array<char, 128> recv_buf; +// udp::endpoint sender_endpoint; +// size_t len = socket.receive_from( +// boost::asio::buffer(recv_buf), sender_endpoint); +// +// std::cout.write(recv_buf.data(), len); + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + } + + return 0; +} + +#include <ctime> +#include <iostream> +#include <string> +#include <boost/array.hpp> +#include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/asio.hpp> + +using boost::asio::ip::udp; + +std::string make_daytime_string() +{ + using namespace std; // For time_t, time and ctime; + time_t now = time(0); + return ctime(&now); +} + +typedef std::map<int, udp::endpoint> nid2endpoint_t; + +class udp_server +{ +public: + udp_server(boost::asio::io_service& io_service, int port, const nid2endpoint_t & nid2endpoint) + : socket_(io_service, udp::endpoint(udp::v4(), port)), + nid2endpoint(nid2endpoint) + { + start_receive(); + } + +private: + void start_receive() + { + socket_.async_receive_from( + boost::asio::buffer(recv_buffer_), remote_endpoint_, + boost::bind(&udp_server::handle_receive, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + void handle_receive(const boost::system::error_code& error, + std::size_t /*bytes_transferred*/) + { + if (!error || error == boost::asio::error::message_size) + { + using namespace std; + using namespace boost::archive; + string s(recv_buffer_.begin(), recv_buffer_.end()); + istringstream ss(s); + binary_iarchive ia(ss); + long long time; + int src, echoer, seqno; + ia & src & echoer & seqno & time; + // cout << "got ping from " << src << " with time " << time << endl; + + const udp::endpoint & back = nid2endpoint.find(src)->second; + + if (src == my_nid) { + long rtt = currentTimeMillis() - time; + cout << "to " << echoer << " seqno " << seqno << " rtt " << rtt << endl; + } else { + boost::shared_ptr<std::string> message( + new std::string(s.begin(), s.end())); + + socket_.async_send_to(boost::asio::buffer(*message), back, + boost::bind(&udp_server::handle_send, this, message, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + start_receive(); + } + else + { + std::cout << "error: " << error << std::endl; + } + } + + void handle_send(boost::shared_ptr<std::string> /*message*/, + const boost::system::error_code& error, + std::size_t /*bytes_transferred*/) + { + if (error) { + std::cout << "error sending: " << error << std::endl; + } + } + + udp::socket socket_; + udp::endpoint remote_endpoint_; + // std::vector<char> recv_buffer_; + boost::array<char, 60> recv_buffer_; + const nid2endpoint_t & nid2endpoint; +}; + +using namespace std; + +int receiver(string host, int port, const nid2endpoint_t & nid2endpoint) +{ + try + { + udp_server server(io_service, port, nid2endpoint); + io_service.run(); + } + catch (std::exception& e) + { + std::cerr << e.what() << std::endl; + return 1; + } + // std::cout << "receiver started!!!" << std::endl; + + return 0; +} + +int main(int argc, char* argv[]) +{ + if (argc != 2) + { + std::cerr << "Usage: pinger <my_nid>" << std::endl; + return 1; + } + + my_nid = atoi(argv[1]); + + nid2endpoint_t nid2endpoint; + vector<udp::endpoint> endpoints; + string my_host; + int my_port; + { + string line; + int nid = 1; + for (ifstream ifs("hosts"); getline(ifs,line); nid++) { + string host; + istringstream iss(line); + iss >> host; + string port; + if (!(iss >> port)) { + port = default_port; + } + + if (nid == my_nid) { + // found ourselves + my_host = host; + istringstream port_reader(port); + port_reader >> my_port; + } else { + // ping all others + udp::resolver resolver(io_service); + udp::resolver::query query(udp::v4(), host, port); + udp::endpoint endpoint = *resolver.resolve(query); + endpoints.push_back(endpoint); + nid2endpoint[nid] = endpoint; + } + + // cout << host << " " << port << endl; + } + } + + boost::thread_group threads; + threads.create_thread(bind(&sender, endpoints)); + threads.create_thread(bind(&receiver, my_host, my_port, nid2endpoint)); + threads.join_all(); + return 0; +} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |