From: Keith F. <ven...@us...> - 2002-03-24 15:01:04
|
Update of /cvsroot/planeshift/planeshift/src/common/net In directory usw-pr-cvs1:/tmp/cvs-serv30473 Modified Files: netthread.h netthread.cpp netpacket.h netbase.h netbase.cpp Added Files: netpacket.cpp Log Message: Implemented multipacket packets in NetBase and per client outbound queues in NetThread. --- NEW FILE: netpacket.cpp --- /* * netpacket.cpp * * Copyright (C) 2001 PlaneShift Team (in...@pl..., * http://www.planeshift.it) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation (version 2 of the License) * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */ #include <config.h> #include <cssys/sysfunc.h> // csTicks #include "net/netpacket.h" psNetPacketEntry::psNetPacketEntry (psNetPacket* packet, uint32_t cnum, uint16_t sz) : clientnum(cnum) , p(packet) { p->pktsize = sz - sizeof(psNetPacket); timestamp = csGetTicks(); } /** construct a new PacketEntry for a single or partial message */ psNetPacketEntry::psNetPacketEntry (msgpriority_t pri, uint32_t cnum, uint32_t id, uint32_t off, uint32_t totalsize, uint16_t sz, psMessage *msg) { p = (psNetPacket*) malloc (sizeof(psNetPacket) + sz); CS_ASSERT(p != NULL); clientnum = cnum; p->priority = pri; p->msgid = id; p->offset = off; p->pktsize = sz; p->msgsize = totalsize; timestamp = csGetTicks(); if (msg && sz && sz != ACK) memcpy(p->data, ((char *)msg) + off, sz); } psNetPacketEntry::psNetPacketEntry (msgpriority_t pri, uint32_t cnum, uint32_t id, uint32_t off, uint32_t totalsize, uint16_t sz, char *bytes) { p = (psNetPacket*) malloc (sizeof(psNetPacket) + sz); CS_ASSERT(p != NULL); clientnum = cnum; p->priority = pri; p->msgid = id; p->offset = off; p->pktsize = sz; p->msgsize = totalsize; timestamp = csGetTicks(); if (bytes && sz && sz != ACK) memcpy(p->data, bytes, sz); } psNetPacketEntry::~psNetPacketEntry() { if (p) free(p); } void psNetPacketEntry::Append(psNetPacketEntry *next) { printf("Appending packet into MULTIPACKET.\n"); /** * If the msgid is not MULTIPACKET, then we need to set up * the initial merged packet structure before copying the * second packet in. */ if (p->msgid != MULTIPACKET) { psNetPacket *merge; /** * Allocate the MULTIPACKET to max size, so we don't have to realloc * or copy data more than once. Only exact number of bytes will be * sent on the wire. */ merge = (psNetPacket*) malloc (MAXPACKETSIZE); CS_ASSERT(merge != NULL); /** * priority is copied from first packet now. Later, if any packet merged is HIGH * then the whole packet is HIGH */ merge->priority = p->priority; merge->msgid = MULTIPACKET; merge->offset = 0; merge->pktsize = p->GetPacketSize(); merge->msgsize = merge->pktsize; /** * retain timestamp of first packet */ // timestamp = csGetTicks(); /** * copy entire first packet, with header, into data section * of new packet. */ memcpy(merge->data, p, p->GetPacketSize() ); delete p; // done with old packet p = merge; } /** * now copy the second packet onto the end of the merged one * and update appropriate header fields */ if (next->p->priority == PRIORITY_HIGH) p->priority = PRIORITY_HIGH; // HIGH overrides LOW but not vice versa /** * copy the entire 2nd packet into 1st packet after existing data */ memcpy(p->data+p->pktsize, next->p, next->p->GetPacketSize() ); /** * now update length of outer packet */ p->pktsize += next->p->GetPacketSize(); p->msgsize = p->pktsize; }; psNetPacketEntry *psNetPacketEntry::GetNextPacket(psNetPacket * &packetdata) { if (p->msgid != MULTIPACKET) { packetdata = NULL; return this; } else { if (!packetdata) packetdata = (psNetPacket *)p->data; // we need the packet INSIDE the packet else { packetdata = (psNetPacket *)((char *)packetdata + packetdata->GetPacketSize()); if (packetdata - p > p->msgsize) { packetdata = NULL; return NULL; } } printf("Extracting packet from MULTIPACKET.\n"); psNetPacketEntry *pnew = new psNetPacketEntry (packetdata->priority, clientnum, packetdata->msgid, packetdata->offset, packetdata->msgsize, packetdata->pktsize, (char *)packetdata->data); return pnew; } } Index: netthread.h =================================================================== RCS file: /cvsroot/planeshift/planeshift/src/common/net/netthread.h,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -d -r1.20 -r1.21 *** netthread.h 14 Mar 2002 00:20:11 -0000 1.20 --- netthread.h 24 Mar 2002 15:00:59 -0000 1.21 *************** *** 44,56 **** bool StartThread(); ! enum broadcasttype ! { ! BC_EVERYONEBUTSELF = 1, ! BC_GROUP, ! BC_GUILD, ! BC_SECTOR ! }; ! ! /** This repeats the same msg out to a bunch of Clients. * DON'T use this in the client app, it's only here for the MsgHandler * class! --- 44,49 ---- bool StartThread(); ! /** ! * This repeats the same msg out to a bunch of Clients. * DON'T use this in the client app, it's only here for the MsgHandler * class! *************** *** 61,67 **** void CheckLinkDead(void); ! // return pointer to the list of clients ClientConnectionSet* GetConnections() { return &clients; } protected: --- 54,61 ---- void CheckLinkDead(void); ! /// return pointer to the list of clients ClientConnectionSet* GetConnections() { return &clients; } + virtual bool SendMessage (MsgEntry* me); protected: Index: netthread.cpp =================================================================== RCS file: /cvsroot/planeshift/planeshift/src/common/net/netthread.cpp,v retrieving revision 1.35 retrieving revision 1.36 diff -C2 -d -r1.35 -r1.36 *** netthread.cpp 19 Mar 2002 18:18:53 -0000 1.35 --- netthread.cpp 24 Mar 2002 15:00:59 -0000 1.36 *************** *** 105,108 **** --- 105,126 ---- } + bool NetThread::SendMessage(MsgEntry* me) + { + Client *client = clients.Find(me->clientnum); + if (!client) + return FALSE; + + /** + * The senders list is a list of busy queues. The SendOut() function + * in NetBase clears this list each time through. This saves having + * to check every single connection each time through. + */ + if (client->outqueue->IsEmpty()) + senders.AddItem(client->outqueue); + + return NetBase::SendMessage(me,client->outqueue); + } + + // thread handling bool NetThread::StartThread() *************** *** 127,131 **** while ( true ) { ! This->Run(); task::sleep(20); csTicks currentticks = csGetTicks(); --- 145,149 ---- while ( true ) { ! This->Run(); task::sleep(20); csTicks currentticks = csGetTicks(); Index: netpacket.h =================================================================== RCS file: /cvsroot/planeshift/planeshift/src/common/net/netpacket.h,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** netpacket.h 12 Mar 2002 21:51:44 -0000 1.4 --- netpacket.h 24 Mar 2002 15:00:59 -0000 1.5 *************** *** 24,27 **** --- 24,29 ---- #define ACK (0) // 0 pktsize means ACK pkt #define PKTMAXLATENCY (500) // 500 mseconds till packet resent + #define MULTIPACKET (0) // 0 msgid means multi-packet packet + #define MAXPACKETSIZE 500 // 500 bytes is max size of network packet /** *************** *** 79,121 **** * free on the packet pointer later! */ ! psNetPacketEntry (psNetPacket* packet, uint32_t cnum, uint16_t sz) ! : clientnum(cnum) ! , p(packet) ! { ! p->pktsize = sz - sizeof(psNetPacket); ! timestamp = csGetTicks(); ! } ! /** construct a new PacketEntry for a single message */ psNetPacketEntry (msgpriority_t pri, uint32_t cnum, uint32_t id, ! uint32_t off, uint32_t totalsize, uint16_t sz, psMessage *msg) ! { ! p = (psNetPacket*) malloc (sizeof(psNetPacket) + sz); ! CS_ASSERT(p != NULL); ! clientnum = cnum; ! p->priority = pri; ! p->msgid = id; ! p->offset = off; ! p->pktsize = sz; ! p->msgsize = totalsize; ! timestamp = csGetTicks(); ! if (msg && sz && sz != ACK) ! memcpy(p->data, ((char *)msg) + off, sz); ! } psNetPacketEntry (psNetPacketEntry* ) { ! CS_ASSERT(false); } ! psNetPacketEntry::~psNetPacketEntry() ! { ! if (p) ! free(p); ! } void* GetData() { ! return p; } --- 81,108 ---- * free on the packet pointer later! */ ! psNetPacketEntry (psNetPacket* packet, uint32_t cnum, uint16_t sz); ! /** construct a new PacketEntry for a single or partial message */ psNetPacketEntry (msgpriority_t pri, uint32_t cnum, uint32_t id, ! uint32_t off, uint32_t totalsize, uint16_t sz, psMessage *msg); ! ! /** construct a new PacketEntry for a specified buffer of bytes */ ! psNetPacketEntry::psNetPacketEntry (msgpriority_t pri, uint32_t cnum, uint32_t id, ! uint32_t off, uint32_t totalsize, uint16_t sz, char *bytes); psNetPacketEntry (psNetPacketEntry* ) { ! CS_ASSERT(false); } ! psNetPacketEntry::~psNetPacketEntry(); ! ! void Append(psNetPacketEntry *next); ! psNetPacketEntry *GetNextPacket(psNetPacket* &packetdata); ! void* GetData() { ! return p; } *************** *** 123,127 **** { return (clientnum == other.clientnum && ! p->msgid == other.p->msgid && p->offset == other.p->offset); }; --- 110,114 ---- { return (clientnum == other.clientnum && ! p->msgid == other.p->msgid && p->offset == other.p->offset); }; *************** *** 131,141 **** if (clientnum < other.clientnum) return true; ! if (clientnum > other.clientnum) ! return false; if (p->msgid < other.p->msgid) return true; ! if (p->msgid > other.p->msgid) ! return false; if (p->offset < other.p->offset) --- 118,128 ---- if (clientnum < other.clientnum) return true; ! if (clientnum > other.clientnum) ! return false; if (p->msgid < other.p->msgid) return true; ! if (p->msgid > other.p->msgid) ! return false; if (p->offset < other.p->offset) Index: netbase.h =================================================================== RCS file: /cvsroot/planeshift/planeshift/src/common/net/netbase.h,v retrieving revision 1.29 retrieving revision 1.30 diff -C2 -d -r1.29 -r1.30 *** netbase.h 17 Mar 2002 08:25:59 -0000 1.29 --- netbase.h 24 Mar 2002 15:00:59 -0000 1.30 *************** *** 26,31 **** --- 26,34 ---- #include "util/binarytree.h" #include "util/genqueue.h" + #include <csutil/csdllist.h> + #define NUM_BROADCAST 0xffffffff + #define MAXQUEUESIZE 100 /* Include platform specific socket settings */ *************** *** 42,45 **** --- 45,49 ---- typedef GenericQueue <psNetPacketEntry> NetPacketQueue; + /** * This class acts as a base for client/server net classes. It tries to define *************** *** 71,75 **** * Put a message into the outgoing queue */ ! virtual bool SendMessage (MsgEntry* me); /** --- 75,79 ---- * Put a message into the outgoing queue */ ! virtual bool SendMessage (MsgEntry* me,NetPacketQueue *queue=NULL); /** *************** *** 186,191 **** --- 190,209 ---- void HandleCompletedMessage(MsgEntry *me,Connection *connection,LPSOCKADDR_IN addr); + /** + * This attempts to merge as many packets as possible into one before + * returning back a packet ptr to send on the wire. + */ + psNetPacketEntry* GetMergedPacket(NetPacketQueue *q); + + /** + * This does the sending and puts the packet in "awaiting ack" if necessary. + */ + bool NetBase::SendFinalPacket(psNetPacketEntry *pkt); + /** Outgoing message queue */ NetPacketQueue *NetworkQueue; + + /** list of outbound queues with waiting data */ + csDLinkList senders; /** Incoming message queue vector */ Index: netbase.cpp =================================================================== RCS file: /cvsroot/planeshift/planeshift/src/common/net/netbase.cpp,v retrieving revision 1.42 retrieving revision 1.43 diff -C2 -d -r1.42 -r1.43 *** netbase.cpp 19 Mar 2002 18:18:53 -0000 1.42 --- netbase.cpp 24 Mar 2002 15:00:59 -0000 1.43 *************** *** 33,37 **** int NetBase::socklibrefcount=0; - #define MAXPACKETSIZE 500 #define PSMIN(a,b) ((a<b)?a:b) --- 33,36 ---- *************** *** 43,47 **** socklibrefcount++; ! NetworkQueue = new NetPacketQueue(outqueuelen); if (!NetworkQueue) ERRORHALT("No Memory!"); --- 42,46 ---- socklibrefcount++; ! NetworkQueue = new NetPacketQueue(MAXQUEUESIZE); if (!NetworkQueue) ERRORHALT("No Memory!"); *************** *** 136,141 **** return true; ! BuildMessage(pkt,connection,&addr); ! return true; } --- 135,160 ---- return true; ! /** ! * Now either send this packet to BuildMessage, or loop through ! * subpackets if they are merged. ! */ ! psNetPacketEntry *splitpacket= pkt; ! psNetPacket *packetdata = NULL; ! ! do ! { ! splitpacket = pkt->GetNextPacket(packetdata); ! if (splitpacket) ! BuildMessage(splitpacket,connection,&addr); ! ! } while (packetdata); ! ! /** ! * If we split apart a multipacket packet above, we are done ! * with the merged one now. ! */ ! if (splitpacket != pkt) ! delete pkt; ! return true; } *************** *** 173,177 **** p->msgid, p->offset, ! p->msgsize,ACK,NULL); int err = SendTo (addr, ack->GetData(), ack->p->GetPacketSize() ); --- 192,196 ---- p->msgid, p->offset, ! p->msgsize,ACK,(char *)NULL); int err = SendTo (addr, ack->GetData(), ack->p->GetPacketSize() ); *************** *** 200,203 **** --- 219,223 ---- if (pkt) { + printf("Resending nonacked HIGH packet.\n"); // FIXME:Need to do this from within for loop, but delete breaks iterator. pkt->timestamp = currenttime; // update stamp on packet *************** *** 213,259 **** } ! bool NetBase::SendOut() { ! /* ! * This has been changed from a macro to a variable, because on ! * winsock we can use a value in the WSAData struct for this info, ! * and on linux there are certain system calls that can do this. ! */ ! //static unsigned short fixed_max_msg_size = 500; ! /* ! * Part 2 outgoing packets ! */ ! CS_ASSERT(ready); ! // send packets out ! if (NetworkQueue->IsEmpty()) ! return false; ! ! psNetPacketEntry* pkt = NetworkQueue->Get(); ! CS_ASSERT(pkt != NULL); ! if (pkt->clientnum == NUM_BROADCAST) { ! Connection* connection; ! // Send broadcast here ! for (connection = FirstConnection(); connection; connection = NextConnection()) { ! // force clientnum ! pkt->clientnum = connection->clientnum; ! int err = SendTo (&(connection->addr), (void*) pkt->GetData(), ! pkt->p->GetPacketSize() ); ! ! if (err != (int) (pkt->p->GetPacketSize()) ) ! { ! ERRORMSG ("send error! :"); ! printf("errno of the error is %d!\n",errno); ! } } ! ! delete pkt; ! return true; } Connection* connection = GetConnByNum(pkt->clientnum); if (!connection) --- 233,263 ---- } ! psNetPacketEntry* NetBase::GetMergedPacket(NetPacketQueue *q) { ! psNetPacketEntry *candidate,*final; ! final = q->Get(); ! if (q->IsEmpty()) ! return final; ! candidate = q->Peek(); ! ! while (candidate) { ! if (candidate->p->GetPacketSize() + final->p->GetPacketSize() < MAXPACKETSIZE) { ! candidate = q->Get(); // take out of queue ! final->Append(candidate); ! candidate = q->Peek(); // check next entry next time through loop } ! else ! break; // packet will be too large if we merge more } + return final; + } + bool NetBase::SendFinalPacket(psNetPacketEntry *pkt) + { Connection* connection = GetConnByNum(pkt->clientnum); if (!connection) *************** *** 267,295 **** // send packet... ! int err; ! /** ! * This rand simulates UDP Send failures on 1 in 5 outbound packets, ! * which should trigger nonACKed resends. ! * FIXME: Take out when we're confident this works. ! */ ! #if 0 ! if (rand() % 5) ! #else ! if (1) ! #endif ! err = SendTo (&(connection->addr), pkt->GetData(), pkt->p->GetPacketSize()); - else - { - printf("Simulated packet send failure!\n"); - delete pkt; - return true; - } if (err != (int) (pkt->p->GetPacketSize()) ) { ! ERRORMSG ("send error, packet too large?"); ! delete pkt; ! return true; } --- 271,283 ---- // send packet... ! int err = SendTo (&(connection->addr), pkt->GetData(), pkt->p->GetPacketSize()); if (err != (int) (pkt->p->GetPacketSize()) ) { ! ERRORMSG ("send error:"); ! printf("errno was %d\n",errno); ! delete pkt; ! return true; } *************** *** 301,315 **** if (pkt->p->priority == PRIORITY_HIGH) { ! printf("Sent HIGH pkt id %d to client %d.\n", ! pkt->p->msgid, pkt->clientnum); ! awaitingack.Add(pkt); ! // queue holds ref now -> don't delete pkt ! return true; } ! delete pkt; return true; } bool NetBase::Init(int port) { --- 289,333 ---- if (pkt->p->priority == PRIORITY_HIGH) { ! // printf("Sent HIGH pkt id %d to client %d.\n", ! // pkt->p->msgid, pkt->clientnum); ! awaitingack.Add(pkt); ! // queue holds ref now -> don't delete pkt ! return true; } ! delete pkt; return true; } + bool NetBase::SendOut() + { + CS_ASSERT(ready); + bool sent_anything=false; + + // send packets out + if (!NetworkQueue->IsEmpty()) // client uses this queue + { + psNetPacketEntry* pkt = GetMergedPacket(NetworkQueue); + CS_ASSERT(pkt != NULL); + return SendFinalPacket(pkt); + } + else while (senders.GetFirstItem()) // server has list of busy queues + { + NetPacketQueue *q = (NetPacketQueue*) senders.GetFirstItem(); + + while (!q->IsEmpty() ) + { + psNetPacketEntry *pkt = GetMergedPacket(q); + CS_ASSERT(pkt != NULL); + SendFinalPacket(pkt); // this frees pkt if necessary + sent_anything = true; + } + + senders.RemoveItem((void*)q); + } + return sent_anything; + } + + bool NetBase::Init(int port) { *************** *** 413,422 **** } ! bool NetBase::SendMessage(MsgEntry* me) { size_t bytesleft = me->data->GetSize(); size_t offset = 0; ! // printf ("Send Packet Len: %d\n", bytesleft); while (bytesleft > 0) { --- 431,443 ---- } ! bool NetBase::SendMessage(MsgEntry* me,NetPacketQueue *queue) { size_t bytesleft = me->data->GetSize(); size_t offset = 0; ! if (!queue) ! queue = NetworkQueue; ! ! // printf ("Send Packet Len: %d\n", bytesleft); while (bytesleft > 0) { *************** *** 431,435 **** me->data); ! NetworkQueue->Add(pNewPkt); bytesleft -= pktlen; --- 452,456 ---- me->data); ! queue->Add(pNewPkt); bytesleft -= pktlen; *************** *** 483,487 **** first = new psNetPacketEntry(PRIORITY_LOW, client, id, ! 0, 0, 0, NULL); // This search is FAST, and without the first packet, you can't build the message. --- 504,508 ---- first = new psNetPacketEntry(PRIORITY_LOW, client, id, ! 0, 0, 0, (char *)NULL); // This search is FAST, and without the first packet, you can't build the message. *************** *** 505,509 **** return NULL; ! printf("Message %d is complete. Publishing...\n", id); MsgEntry *me = new MsgEntry(totallength, PRIORITY_HIGH); --- 526,530 ---- return NULL; ! // printf("Message %d is complete. Publishing...\n", id); MsgEntry *me = new MsgEntry(totallength, PRIORITY_HIGH); |