oiocp:overlapped io completion port
net_t use both overlapped and io completion port
use one instance of net_t,you can create many peer listen to accept new connect
and create many peer connect to any server include those listen peer in the same
instance of net_t
send those control msg(newServerMsg_t,newClientMsg_t,...) to SYS_ID to control the net_t
net_t will send notify msg(newServerRetMsg_t,newClientRetMsg_tId,newConnectMsg_tId,...) to user with id = SYS_ID
the put function
void put(const void buf,unsigned long lenInByte,int id);
void put(netMsg_t msg);
void put(buf_t first,buf_t last);
will make a copy of data that you passed in,
net_t internal use mempool to keep the data that will send and have recved
net_t will merge data(when send call not return and before next send call) and send in one call
the data size that send in one call is limited(in sendTask_t)
those async parameter of newServerMsg_t and newClientMsg_t
unsigned long asyncRecv;
unsigned long asyncSend;
unsigned long asyncAccept;
unsigned long asyncConnect;
means overlapped operation count
one instance of net_t can only work in one mode,stream mode or msg mode.
stream mode:
send use: void put(const void* buf,unsigned long lenInByte,int id); void put(netMsg_t *msg); recv use: buf_t *getBuf(bool wait); getBuf return a linked list,each buf_t may come from diffrent peer, the data and data size of buf_t have no relation from put call
msg mode:
send use: void put(const void* buf,unsigned long lenInByte,int id); void put(netMsg_t *msg); recv use: netMsg_t *get(bool wait); this function return a linked list, each netMsg_t may come from diffrent peer, the data and data size of netMsg_t is the same to each call of put
//warning:see src for use with this two function
netMsg_t &netMsg_t::operator>>(char &v);
netMsg_t &netMsg_t::operator>>(unsigned char &v);
warning:the byte order is in raw host order when use netMsg_t to serialize and deserialize data
see netTutorial for use example.
struct msg_t;
class sendTask_t;
class recvTask_t;
class packTask_t;
class unpackTask_t;
class connectTask_t;
class acceptTask_t;
class iocpTask_t;
class netMsgSendTask_t;
class netMsgRecvTask_t;
class peerTask_t;
class netMsg_t;
class buf_t;
class event_t;
class netExport net_t{
peerTask_t peerTask;
sendTask_t sendTask;
recvTask_t recvTask;
packTask_t packTask;
unpackTask_t unpackTask;
connectTask_t connectTask;
acceptTask_t acceptTask;
iocpTask_t iocpTask;
netMsgSendTask_t netMsgSendTask;
netMsgRecvTask_t netMsgRecvTask;
public:
enum mode_enum{
//normal tcp mode
stream_e,
//you will get the same size netMsg_t the same to what you put it
//the net library accomplish this mechanism
msg_e
};
net_t(mode_enum mode,unsigned long maxPeer);
virtual ~net_t();
//begin thread safe function,can be call at any thread //___________________ void put(const void* buf,unsigned long lenInByte,int id); void put(netMsg_t *msg); void put(buf_t *first,buf_t *last); netMsg_t *get(bool wait); buf_t *getBuf(bool wait); void showInfo(); void closeAll(); //end thread safe function //___________________ void setNewNetMsgNotify(event_t *event); peerTask_t *getPeerTask(){return peerTask;}
private:
mode_enum mode;
};
void main(int argc,char argv){
cout<<"input ping test pingMsg_t\ninput exit exit app\ninput other str test chatMsg_t\nuse browser with url:http://127.0.0.1 test stream mode and it return a time str\n";
//init winsocket
wsa_t wsa;
app_t app;
app.main(argc,argv);
string v;
while(true){
cin>>v;
if(v=="ping"){
app.sendPingMsg();
}else if(v=="exit"){
break;
}else{
app.sendChatMsg(v.c_str());
}
}
}
enum appMsg_e{
pingMsg_tId=userStartMsg_tId,
pingRetMsg_tId,
chatMsg_tId,
appMsgEnd_tId
};
//fix size msg
struct pingMsg_t:public msg_t{
time_t time;
unsigned long tick;
pingMsg_t();
};
//fix size msg
struct pingRetMsg_t:public msg_t{
time_t time;
unsigned long tick;
pingRetMsg_t(const pingMsg_t &me):msg_t(pingRetMsg_tId)
,time(me.time),tick(me.tick){}
};
//variable length msg must be serialize and deserialize
struct chatMsg_t:msg_t{
string msg;
chatMsg_t():msg_t(chatMsg_tId){}
chatMsg_t(const char msg):msg_t(chatMsg_tId),msg(msg){}
//serialize
inline netMsg_t &operator>>(netMsg_t &n){
n<<typeId<<msg; return="" n;="" }="" deserialize="" inline="" netMsg_t="" &operator<<(netMsg_t="" &n){="" n="">>typeId>>msg;
return n;
}
};
pingMsg_t::pingMsg_t():msg_t(pingMsg_tId){
::time(&time);
tick=GetTickCount();
}
class net_t;
//start two thread
//one for msg mode
//one for stream mode
class app_t:public task_t{
protected:
//a pair of connect demo,in msg mode
net_t net;
//just return time str,in stream mode
//use browser test,url: http://127.0.0.1
net_t *httpNet;
bool startNet;
unsigned long clientId;
vector<char> netMsgData;
public:
app_t();
~app_t();
virtual int main(int argc,char argv);
virtual void start(int threadCount); virtual void stop(); virtual int svc(); virtual void processNetMsg(); virtual void onSysMsg(netMsg_t *netMsg); virtual void onNetMsg(netMsg_t *netMsg); void sendPingMsg(); void sendChatMsg(const char *msg); virtual void processHttpStreamMsg(); virtual void onStreamSysMsg(buf_t *netMsg); virtual void onStreamNetMsg(buf_t *netMsg);
};
int app_t::main(int argc,char **argv){
//in msg mode
net=new net_t(net_t::msg_e,10);
//in stream mode httpNet=new net_t(net_t::stream_e,10); start(2); //start listen and accept on 80 port newServerMsg_t newServerMsg(addr_t("127.0.0.1",80)); //SYS_ID ,the net libray will process msg with id equal SYS_ID httpNet->put((char*)&newServerMsg,sizeof newServerMsg,SYS_ID); //start a msg mode server listen on 81 newServerMsg_t newServerMsgb(addr_t("127.0.0.1",81)); net->put(&newServerMsgb,sizeof newServerMsg,SYS_ID); //connect to msg mode server newClientMsg_t newClientMsg(addr_t("127.0.0.1",81)); net->put(&newClientMsg,sizeof newClientMsg,SYS_ID); return 0;
};
app_t::app_t():net(NULL),httpNet(NULL),startNet(true){
}
app_t::~app_t(){
delObj(net);
delObj(httpNet);
}
void app_t::start(int threadCount){
task_t::start(threadCount);
}
void app_t::stop(){
task_t::stop();
}
void app_t::sendPingMsg(){
//method one
//send use net->put
net->put(&pingMsg_t(),sizeof pingMsg_t,clientId);
//method two //send use netMsg_t netMsg_t netMsg(clientId); netMsg.put(&pingMsg_t(),sizeof pingMsg_t); net->put(&netMsg);
}
void app_t::sendChatMsg(const char msg){
netMsg_t netMsg(clientId);
chatMsg_t chatMsg(msg);
chatMsg>>netMsg;
net->put(&netMsg);
}
int app_t::svc(){
startNet=!startNet;
if(startNet){
while(run){
processNetMsg();
}
}else{
while(run){
processHttpStreamMsg();
}
}
return 0;
}
void app_t::processNetMsg(){
netMsg_t netMsg;
netMsg_t nextNetMsg;
netMsg=net->get(true);
while(netMsg){
nextNetMsg=netMsg->getNext();
netMsg->link(NULL);
if(netMsg->getId()==SYS_ID){
//process sys msg generate by net libray
onSysMsg(netMsg);
}else{
if(netMsg->getSize()<4){
//the msg_t is bigger than 4 if you use msg that derived from msg_t
xerr("recv a netMsg with size <4 =true from %d",netMsg->getId());
net->put(&delConnectMsg_t(netMsg->getId()),sizeof(delConnectMsg_t),SYS_ID);
delete netMsg;
}else{
onNetMsg(netMsg);
}
}
netMsg=nextNetMsg;
}
}
void app_t::onNetMsg(netMsg_t netMsg){
if(netMsgData.size()<netMsg->getSize()){
netMsgData.resize(netMsg->getSize());
}
//the netMsg_t data may com from several buf_t,this will make it continue in memory
netMsg->get(&netMsgData[0],netMsg->getSize(),true);
char data=&netMsgData[0];
unsigned long msgTypeId=(unsigned long)data;
netMsg_t &n=netMsg;
switch(msgTypeId){
case pingMsg_tId:{
xlog("msgServer:recv pingMsg_t from peer %d",netMsg->getId());
pingMsg_t pingMsg=(pingMsg_t)data;
net->put(&pingRetMsg_t(pingMsg),sizeof pingRetMsg_t,netMsg->getId());
}break;
case pingRetMsg_tId:{
xlog("msgServer client:recv pingRetMsg_t from peer %d",netMsg->getId());
pingRetMsg_t pingMsg=(pingRetMsg_t)data;
}break;
case chatMsg_tId:{
chatMsg_t chatMsg;
chatMsg<<n; xlog("msgServer:recv="" chatMsg_t="" from="" peer="" %d\\nmsg:="" %s",netMsg-="">getId(),chatMsg.msg.c_str());
}break;
default:{
xerr("recv unknown msg id %d from peer %d",msgTypeId,netMsg->getId());
}break;
}
delete netMsg;
}
void app_t::onSysMsg(netMsg_t netMsg){
//sys msg is less than blockBufSize,so every sys msg is in one buf_t
if(netMsg->getSize()>buf_t::blockBufSize){
xerr("recv a sys msg big than %d,I should re code net,it will never happen",buf_t::blockBufSize);
delete netMsg;
return;
}
char data=netMsg->firstBuf()->getBuf();
unsigned long typeId=(unsigned long)data;
switch(typeId){
case newServerRetMsg_tId:{
newServerRetMsg_t t=(newServerRetMsg_t )data;
if(t->ret){
xerr("msgServer:new server failed\n");
}else{
xlog("msgServer:new server success id:%d localAddr:%s:%d\n",t->id,t->addr.getIp().c_str(),t->addr.getPort());
}
}break;
case newClientRetMsg_tId:{
newClientRetMsg_t t=(newClientRetMsg_t )data;
if(t->ret){
xerr("connect to msg server failed %s\n",wsaErr(t->ret));
}else{
xlog("new client success id:%d remoteAddr:%s:%d localAddr:%s:%d\n",
t->id,
t->remoteAddr.getIp().c_str(),
t->remoteAddr.getPort(),
t->localAddr.getIp().c_str(),
t->localAddr.getPort());
xlog("connect to msg server successed");
clientId=t->id;
}
}break;
case newConnectMsg_tId:{
newConnectMsg_t t=(newConnectMsg_t )data;
xlog("msgServer:new connect id:%d remoteAddr:%s:%d localAddr:%s:%d\n",
t->id,
t->remoteAddr.getIp().c_str(),
t->remoteAddr.getPort(),
t->localAddr.getIp().c_str(),
t->localAddr.getPort());
}break;
case disconnectMsg_tId:{
disconnectMsg_t t=(disconnectMsg_t )data;
xlog("msgServer:peer disconnect %d",t->id);
}break;
default:
break;
}
delete netMsg;
}
void app_t::processHttpStreamMsg(){
buf_t netMsg;
buf_t nextNetMsg;
netMsg=httpNet->getBuf(true);
while(netMsg){
nextNetMsg=netMsg->getNext();
netMsg->link(NULL);
if(netMsg->getId()==SYS_ID){
onStreamSysMsg(netMsg);
}else{
if(netMsg->getSize()<4){
xerr("recv a netMsg with size <4 =true from %d",netMsg->getId());
net->put(&delConnectMsg_t(netMsg->getId()),sizeof(delConnectMsg_t),SYS_ID);
delete netMsg;
}else{
onStreamNetMsg(netMsg);
}
}
netMsg=nextNetMsg;
}
}
void app_t::onStreamSysMsg(buf_t netMsg){
//sys msg mean a msg generate by net library
//sys msg is in a single buf_t
if(netMsg->getSize()>buf_t::blockBufSize){
xerr("recv a sys msg big than %d,i should re code net,it will never happen",buf_t::blockBufSize);
delete netMsg;
return;
}
char data=netMsg->getBuf();
unsigned long typeId=(unsigned long)data;
switch(typeId){
case newServerRetMsg_tId:{
newServerRetMsg_t t=(newServerRetMsg_t )data;
if(t->ret){
xerr("new server failed\n");
}else{
xlog("new server success id:%d localAddr:%s:%d\n",t->id,t->addr.getIp().c_str(),t->addr.getPort());
}
}break;
case newConnectMsg_tId:{
newConnectMsg_t t=(newConnectMsg_t )data;
xlog("httpServer:new connect id:%d remoteAddr:%s:%d localAddr:%s:%d\n",
t->id,
t->remoteAddr.getIp().c_str(),
t->remoteAddr.getPort(),
t->localAddr.getIp().c_str(),
t->localAddr.getPort());
}break;
case disconnectMsg_tId:{
disconnectMsg_t t=(disconnectMsg_t *)data;
xlog("peer disconnect %d",t->id);
}break;
default:
break;
}
delete netMsg;
}
char hdrbuf[HDRBUFSIZE]; / For building HTTP headers /
void sendhdr(net_t net,unsigned long id,char mimeType,int contentlen){
char * cp;
sprintf(hdrbuf, "HTTP/1.1 200 OK\r\n");
cp = hdrbuf + strlen(hdrbuf);
sprintf(cp, "Connection: close\r\n");
cp += strlen(cp);
if(mimeType){
sprintf(cp, "Content-Type: %s\r\n",mimeType);
}
cp += strlen(cp);
if(contentlen>0){
sprintf(cp, "Content-Length: %d\r\n\r\n", contentlen );
cp += strlen(cp);
net->put(hdrbuf,strlen(hdrbuf),id);
}
}
/ format: "Mon, 26 Feb 2007 01:43:54 GMT" /
static char * day[] =
{ "Sun","Mon","Tues","Wed","Thurs","Fri","Sat"};
static char * month[] =
{ "Jan","Feb","March","April","May","June",
"July","Aug","Sept","Oct","Nov","Dec",
};
static char datebuf[36];
static char * xg_getdate(){
time_t timeval;
struct tm * gmt;
timeval = time(NULL); gmt = gmtime(&timeval); sprintf(datebuf, "%s, %u %s %u %u:%u:%u GMT", day[gmt->tm_wday], gmt->tm_mday, month[gmt->tm_mon], gmt->tm_year + 1900, /* Windows year is based on 1900 */ gmt->tm_hour, gmt->tm_min, gmt->tm_wday); return datebuf;
}
void app_t::onStreamNetMsg(buf_t netMsg){
char str=xg_getdate();
int len=strlen(str);
sendhdr(httpNet,netMsg->getId(),"text/plain",len);
httpNet->put(str,len,netMsg->getId());
delete netMsg;
}