Re: [asio-users] async_read in session fetches wrong data. (Following up to "Proper close of ASIO T
Brought to you by:
chris_kohlhoff
From: Roger A. (Australia) <Rog...@in...> - 2016-02-07 23:01:30
|
Both client::handshakeCallback and server::handshakeCallback issue async_read, but neither issues an async_write. So after the handshake both client and server are sitting there waiting for the other to say something. Take a look at the asio chat and echo server examples (http://www.boost.org/doc/libs/1_60_0/doc/html/boost_asio/examples/cpp11_examples.html) for an illustration of how this sort of thing is supposed to work. -----Original Message----- From: Sandra Schreiner [mailto:sas...@st...] Sent: Saturday, 6 February 2016 7:08 PM To: asi...@li... Subject: Re: [asio-users] async_read in session fetches wrong data. (Following up to "Proper close of ASIO TLS connection and reuse afterwards") >>Does your "wire protocol" include binary lengths? E.g. does the header >>include a "body length" that is a binary integer? If so, have you >>handled the endian'ness of the binary data? Yes it does. I use a method "encode header" which fetches the length of the data, handles the endianness and adds a binary length 'string' in front of the data. >> If I have an entity that is "always writing" and rarely (or never) >> reading, I still _always_ have a read hanging. If there is never any >> data coming in, the read handler is basically a "no-op". The problem is i can't write with a 'hanging' read. See the test application I added below. It only uses async operations and shows the mentioned behaviour. After the handshake I start a async_read, but the write is not executed. It doesn't matter if I call io_service.reset() and run() for the write or not. The last message shown is "Client Handshake success ". Why? >> You would probably need some sort of threaded queue to pass data back >> and forth, but there are many simple "wait queues" that will do that. It was planned that this handling would be done by a 'Communicator' class. The threads only purpose was to enable write with a read blocking as said above. I don't know why it seems impossible to execute an async_write while async_read is hanging. I'm really deseprate because of this problem. Would there be any difference if I use boost::asio instead of asio standalone? After days of trying it feels like I should give up on this matter. But it seems impossible that I'm the only or first one trying to implement such a 'simple' task. Basically all I want is the following: 1) Client opens connection 2) As long as the application does not demand any send operation the client should wait for shutdown request from server-side (e.g. if server application is closed) 3) Sometimes the client will send data and the session should respond. The connection stays open. 4) At some point in time the client or server will send a shutdown and the connection should be closed properly. I don't know what I'm doing wrong all the time. Best regards, Sandra Source Code: -------------------------------------------------------------- #include <cstdlib> #include <thread> #include <iostream> #include <chrono> #include <memory> #include "application/framework/impl/asio/asio.hpp" #include "application/framework/impl/asio/asio/ssl.hpp" typedef asio::ssl::stream<asio::ip::tcp::socket> ssl_socket; class session{ public: int connectionStatus; //0 = open, 1 = closing, 2 = closed session(asio::io_service& io_service, asio::ssl::context& context) : socket_(io_service, context) { } ssl_socket::lowest_layer_type& socket(){ return socket_.lowest_layer(); } void start(){ std::cout << " Session start "<< "\n"; connectionStatus = 1; socket_.async_handshake(asio::ssl::stream_base::server, std::bind(&session::handshakeCallback, this, std::placeholders::_1)); } void closeConnection(){ if(connectionStatus == 0){ std::cout << " Session shutdown "<< "\n"; try{ connectionStatus = 1; socket_.shutdown(); }catch(...){ socket_.lowest_layer().close(); } connectionStatus = 2; delete this; } } void handshakeCallback(const asio::error_code& error){ if (!error){ connectionStatus = 0; std::cout << " Session handshake success "<< "\n"; socket_.async_read_some(asio::buffer(data_, max_length), std::bind(&session::readCallback, this, std::placeholders::_1, std::placeholders::_2)); } else { std::cout << " Session handshake failed "<< error.message() << "\n"; closeConnection(); } } void readCallback(const asio::error_code& error, size_t bytes_transferred) { if (!error) { std::cout << " Session Read success "<< "\n"; asio::async_write(socket_, asio::buffer(data_, bytes_transferred), std::bind(&session::writeCallback, this, std::placeholders::_1)); } else { std::cout << " Session Read failed " << error.message() << "\n"; closeConnection(); } } void writeCallback(const asio::error_code& error){ if (!error){ std::cout << "Session Write success "<< "\n"; socket_.async_read_some(asio::buffer(data_, max_length), std::bind(&session::readCallback, this, std::placeholders::_1, std::placeholders::_2)); } else{ std::cout << "Session Write failed" << error.message() << std::endl; closeConnection(); } } ~session(){ std::cout << " Session Desctructor called\n"; closeConnection(); } private: ssl_socket socket_; enum { max_length = 1024 }; char data_[max_length]; }; class server { private: std::vector<std::shared_ptr<session>> sessions; std::shared_ptr<std::thread> mAcceptThread; public: server(asio::io_service& io_service, unsigned short port) : io_service_(io_service), acceptor_(io_service, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)), context_(asio::ssl::context::sslv23) { context_.set_options( asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv2 | asio::ssl::context::single_dh_use); context_.set_password_callback(std::bind(&server::get_password, this)); context_.use_certificate_chain_file("server.pem"); context_.use_private_key_file("server.pem", asio::ssl::context::pem); context_.use_tmp_dh_file("dh1024.pem"); } void start(){ mAcceptThread.reset(new std::thread { [this](){ accept(); io_service_.run(); } }); } std::string get_password() const{ return "test"; } void accept(){ std::cout << " Server accept start "<< "\n"; std::shared_ptr<session> newSession(new session(io_service_, context_)); sessions.push_back(newSession); acceptor_.async_accept(newSession->socket(), std::bind(&server::acceptCallback, this, newSession, std::placeholders::_1)); } void acceptCallback(std::shared_ptr<session> newSession, const asio::error_code& error){ if (!error){ std::cout << " Server Accept success "<< "\n"; newSession->start(); } else { std::cout << " Server Accept failed "<< error.message() <<"\n"; newSession.reset(); } accept(); } void close(){ std::cout << "Server close enter\n"; for(auto &curSession: sessions){ curSession.reset(); } sessions.clear(); std::cout << "Server close leave\n"; } ~server(){ if(mAcceptThread->joinable()){ mAcceptThread->join(); } } private: asio::io_service& io_service_; asio::ip::tcp::acceptor acceptor_; asio::ssl::context context_; }; enum { max_length = 1024 }; class client { public: asio::io_service &mIoService; client(asio::io_service& io_service, asio::ssl::context& context, asio::ip::tcp::resolver::iterator endpoint_iterator) : socket_(io_service, context), mIoService(io_service), requestSize(4){ socket_.set_verify_mode(asio::ssl::verify_peer); socket_.set_verify_callback( std::bind(&client::verifyCertificate, this, std::placeholders::_1, std::placeholders::_2)); asio::async_connect(socket_.lowest_layer(), endpoint_iterator, std::bind(&client::connectCallback, this, std::placeholders::_1)); mIoService.run(); } bool verifyCertificate(bool preverified, asio::ssl::verify_context& ctx){ return true; } void connectCallback(const asio::error_code& error){ if (!error){ std::cout << "Client Connect success "<< "\n"; socket_.async_handshake(asio::ssl::stream_base::client, std::bind(&client::handshakeCallback, this, std::placeholders::_1)); } else { std::cout << "Client Connect failed: " << error.message() << "\n"; closeConnection(); } } void handshakeCallback(const asio::error_code& error){ if (!error) { std::cout << "Client Handshake success "<< "\n"; read(); } else{ std::cout << "Client Handshake failed: " << error.message() << "\n"; closeConnection(); } } void send(std::string request_){ //mIoService.reset(); std::cout << " Client send data "<< "\n"; requestSize = request_.size(); asio::async_write(socket_, asio::buffer(request_, requestSize), std::bind(&client::writeCallback, this, std::placeholders::_1, std::placeholders::_2)); //mIoService.run(); } void read(){ //length only hardcoded for test asio::async_read(socket_, asio::buffer(reply_, 4), std::bind(&client::readCallback, this, std::placeholders::_1, std::placeholders::_2)); } void writeCallback(const asio::error_code& error, size_t bytes_transferred){ if (error){ std::cout << "Client Write failed: " << error.message() << "\n"; closeConnection(); } else { std::cout << " Client Write success "<< "\n"; //read should be open } } void readCallback(const asio::error_code& error, size_t bytes_transferred) { if (!error) { std::cout << "Client Reply: "; std::cout.write(reply_, bytes_transferred); std::cout << "\n"; //start async_read again read(); } else { std::cout << "Client Read failed: " << error.message() << "\n"; closeConnection(); } } void closeConnection(){ std::cout << "Client initiateDisconnect enter" << "\n"; try{ socket_.shutdown(); }catch(std::system_error &er){ std::cout << "Client close error" << er.what() << std::endl; } catch(...){ std::cout << "Client close undefined error" << std::endl; } std::cout << "Client close layer" << std::endl; socket_.lowest_layer().close(); } ~client(){ closeConnection(); } private: asio::ssl::stream<asio::ip::tcp::socket> socket_; size_t requestSize; char request_[max_length]; char reply_[max_length]; }; using namespace std; int main(){ asio::io_service server_service; server s(server_service, 8877); s.start(); std::this_thread::sleep_for(std::chrono::milliseconds(3000)); asio::io_service io_service; asio::ip::tcp::resolver resolver(io_service); asio::ip::tcp::resolver::query query("127.0.0.1", std::to_string(8877)); asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query); asio::ssl::context ctx(asio::ssl::context::sslv23); ctx.load_verify_file("ca.pem"); client c(io_service, ctx, iterator); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); c.send("test"); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); c.closeConnection(); std::this_thread::sleep_for(std::chrono::milliseconds(4000)); std::cout << "Main leave\n"; return 0; } ------------------------------------------------------------------------------ Site24x7 APM Insight: Get Deep Visibility into Application Performance APM + Mobile APM + RUM: Monitor 3 App instances at just $35/Month Monitor end-to-end web transactions and take corrective actions now Troubleshoot faster and improve end-user experience. Signup Now! http://pubads.g.doubleclick.net/gampad/clk?id=272487151&iu=/4140 _______________________________________________ asio-users mailing list asi...@li... https://lists.sourceforge.net/lists/listinfo/asio-users _______________________________________________ Using Asio? List your project at http://think-async.com/Asio/WhoIsUsingAsio |