[bwm-tools-devel] COMMIT - r8 - in trunk: include lib
Brought to you by:
nkukard
From: <bwm...@li...> - 2004-12-28 15:19:44
|
Author: nkukard Date: 2004-12-28 17:19:28 +0200 (Tue, 28 Dec 2004) New Revision: 8 Modified: trunk/include/flow.h trunk/lib/flow.c trunk/lib/ipq.c Log: * Fixed a sly bug in the rate limiting code, which allowed us to burst over our limits * Removed defined macros which deal with rate limiting, made these inline functions * Removed threshold dropping when queue is nearly maxed, for this we should use proper algorithms like TBF or RED Modified: trunk/include/flow.h =================================================================== --- trunk/include/flow.h 2004-12-28 15:13:50 UTC (rev 7) +++ trunk/include/flow.h 2004-12-28 15:19:28 UTC (rev 8) @@ -42,54 +42,7 @@ // Packet id #define PKT_ID(packet) packet->payload->packet_id -// Will the QUEUE exceed with the packet? -#define WILL_QUEUE_EXCEED(pktQueue,packet) \ - ((pktQueue->curLen >= pktQueue->maxLen && pktQueue->maxLen != 0) || \ - ((pktQueue->curSize + PKT_SIZE(packet)) >= pktQueue->maxSize && pktQueue->maxSize != 0)) -// Credit exceeded? -#define WILL_CREDIT_EXCEED(flow,packet,maxCredit,burstCredit) \ - ((maxCredit - PKT_SIZE(packet) < 0 && flow->maxRate > 0 && flow->burstRate == -1) || \ - (burstCredit - PKT_SIZE(packet) < 0 && flow->burstRate > 0 && flow->maxRate != 0)) - -// Will the FLOW queue exceed with the packet? -#define WILL_FLOW_QUEUE_EXCEED(flow,packet) \ - ((flow->curQueueLen >= flow->maxQueueLen && flow->maxQueueLen != 0) || \ - ((flow->curQueueSize + PKT_SIZE(packet)) >= flow->maxQueueSize && flow->maxQueueSize != 0)) - -// Threshold exceeded? -#define TH_EXCEEDED(pktQueue,threshold) \ - (P_FLOW(pktQueue,curQueueLen) > P_FLOW(pktQueue,threshold) && P_FLOW(pktQueue,maxQueueLen) != 0) - -// Get new credit -#define CALC_MAX_CREDIT(curTime,flow,creditVariable) \ - if (flow->maxRate > 0) \ - { \ - /* Calculate credit */ \ - if (curTime.tv_sec - flow->lastPacketTime.tv_sec > 0) \ - creditVariable = flow->maxRate; \ - else \ - creditVariable = flow->curMaxCredit + ((curTime.tv_usec - flow->lastPacketTime.tv_usec) * flow->usMaxCredit); \ - /* Check if we exceed */ \ - if (creditVariable > flow->maxRate) \ - creditVariable = flow->maxRate; \ - } - -/* Calculate how much we can burst */ -#define CALC_BURST_CREDIT(curTime,flow,creditVariable) \ - if (flow->burstRate > 0) \ - { \ - /* Calculate credit */ \ - if (curTime.tv_sec - flow->lastPacketTime.tv_sec > 0) \ - creditVariable = flow->burstRate; \ - else \ - creditVariable = flow->curBurstCredit + ((curTime.tv_usec - flow->lastPacketTime.tv_usec) * flow->usBurstCredit); \ - /* Check if we exceed */ \ - if (creditVariable > flow->burstRate) \ - creditVariable = flow->burstRate; \ - } - - // Our packet statistic structure struct pktStat_t { @@ -119,6 +72,7 @@ GList *packets; }; + // A flow struct flow_t { @@ -146,8 +100,8 @@ struct pktQueue_t *pktQueues[NUM_PRIO_BANDS]; // Setup our thresholds - long int min_th; // We start dropping packets here - long int max_th; // We drop alot more here +// long int min_th; // We start dropping packets here +// long int max_th; // We drop alot more here float parent_th; // Used to check the available percentage of parent bandwidth before we burst // Counters, used for both shape & counter logging @@ -155,11 +109,18 @@ struct pktStat_t counter; // This is the counter which is logged to file GList *groups; - float usMaxCredit; // Credits for maxRate per microsecond - float curMaxCredit; // Current available credit for maxRate 0 <= curCredit <= maxRate - float usBurstCredit; - float curBurstCredit; - struct timeval lastPacketTime; // Last "timeval" that we had a packet come in + double usCredit; // Credits for maxRate per microsecond + long int curCredit; // Current available credit for maxRate, if we burst we will be < 0 + + double usBurstCredit; + long int curBurstCredit; // Curent burstable credit available + + float curThroughput; // Current throughput + unsigned int curThroughputAge; // How many microseconds since last throughput update + struct timeval lastThroughputUpdate; // Last time the throughput was updated + + unsigned int accumMs; // Accumulated number of microseconds + struct timeval lastCreditCalc; // Last "timeval" that we calculated additional credit struct pktStat_t running; // This is the running counter int lastDumpTimestamp; @@ -167,6 +128,7 @@ int counterRemaining; }; + // A group struct group_t { @@ -194,9 +156,11 @@ GList *flowList; }; + // Get avg flow stats struct pktStat_t getFlowTotalStats(struct pktStat_t *pktStats, long int pktStatsLen); + // Create a flow struct flow_t* createFlow( char *flowName, @@ -210,6 +174,7 @@ float parent_th, int reportTimeout); + // Create group struct group_t *createGroup( char *groupName, @@ -217,6 +182,7 @@ int reportTimeout, GList *flowList); + // Function to update all our flow groups void updateGroups( struct flow_t *flow, @@ -224,7 +190,8 @@ int pktSize, int pktDropped, int pktBursted); - + + // Create a packet queue struct pktQueue_t *createPktQueue( long int prio, @@ -236,5 +203,16 @@ void *flowRunner(void *data); + + +// Check if we will exceed flow queuing limits +static inline will_exceed_flow_queue(struct flow_t *flow, unsigned int pktSize) +{ + return ((flow->curQueueLen >= flow->maxQueueLen && flow->maxQueueLen != 0) || + (flow->curQueueSize + pktSize >= flow->maxQueueSize && flow->maxQueueSize != 0)); +} + + + #endif Modified: trunk/lib/flow.c =================================================================== --- trunk/lib/flow.c 2004-12-28 15:13:50 UTC (rev 7) +++ trunk/lib/flow.c 2004-12-28 15:19:28 UTC (rev 8) @@ -70,6 +70,7 @@ flow->statsPos = 0; flow->parent = parentFlow; + flow->maxQueueSize = maxQueueSize; flow->maxQueueLen = maxQueueLen; flow->maxRate = maxRate; @@ -97,27 +98,32 @@ // Check if we can set our credit if (flow->maxRate > 0) { - flow->usMaxCredit = flow->maxRate / 1000000.0; // For micro (millionths) of seconds silly - flow->curMaxCredit = flow->maxRate; + flow->usCredit = flow->maxRate / 1000000.0; // For micro (millionths) of seconds silly + flow->curCredit = 0; } else { - flow->usMaxCredit = 0; - flow->curMaxCredit = 0; + flow->usCredit = 0; + flow->curCredit = 0; } if (flow->burstRate > 0) { flow->usBurstCredit = flow->burstRate / 1000000.0; - flow->curBurstCredit = flow->burstRate; + flow->curBurstCredit = 0; } else { flow->usBurstCredit = 0; flow->curBurstCredit = 0; } - // Set last packet time to now - gettimeofday(&flow->lastPacketTime,NULL); + // Setup throughput stuff... + gettimeofday(&flow->lastThroughputUpdate,NULL); + flow->curThroughputAge = 0; + flow->curThroughput = 0; + // Set last time we calculated credit and the rest... + flow->accumMs = 0; + gettimeofday(&flow->lastCreditCalc,NULL); // Clear our running counters flow->running.pktCount = 0; flow->running.pktSize = 0; @@ -154,8 +160,8 @@ } // Calculate our thresholds - flow->min_th = (flow->maxRate / 750); - flow->max_th = (flow->maxRate / 750) * 4; + //flow->min_th = (flow->maxRate / 750); + //flow->max_th = (flow->maxRate / 750) * 4; // Blank all queues for (i = 0; i < NUM_PRIO_BANDS; i++) @@ -168,29 +174,27 @@ pktQueue->nfmark = -1; pktQueue->curSize = 0; pktQueue->curLen = 0; - pktQueue->maxSize = 0; - pktQueue->maxLen = 0; + pktQueue->maxSize = -1; + pktQueue->maxLen = -1; pktQueue->packets = NULL; } -/* printf("Flow added...\n"); - printf(" flowName - %s\n",flowName); - printf(" parent - %p\n",parentFlow); - printf(" nfmark - %li\n",nfmark); - printf(" statsLen - %li\n",flow->statsLen); - printf(" maxQueueSize - %li\n",flow->maxQueueSize); - printf(" maxQueueLen - %li\n",flow->maxQueueLen); - printf(" maxRate - %li\n",flow->maxRate); - printf(" usMaxCredit - %.6f\n",flow->usMaxCredit); - printf(" curMaxCredit - %.6f\n",flow->curMaxCredit); - printf(" usCredit - %.6f\n",flow->usBurstCredit); - printf(" curCredit - %.6f\n",flow->curBurstCredit); - printf(" burstRate - %li\n",flow->burstRate); - printf("Queue Stuff:\n"); - printf(" min_th - %li\n",flow->min_th); - printf(" max_th - %li\n",flow->max_th); -*/ + printf(" flowName - %s\n",flowName); + printf(" parent - %p\n",parentFlow); + printf(" nfmark - %li\n",nfmark); + printf(" statsLen - %li\n",flow->statsLen); + printf(" maxQueueSize - %li\n",flow->maxQueueSize); + printf(" maxQueueLen - %li\n",flow->maxQueueLen); + printf(" maxRate - %li\n",flow->maxRate); + printf(" usCredit - %.6f\n",flow->usCredit); + printf(" curCredit - %u\n",flow->curCredit); + printf(" usBurstCredit - %.6f\n",flow->usBurstCredit); + printf(" curBurstCredit - %u\n",flow->curBurstCredit); + printf(" burstRate - %li\n",flow->burstRate); +// printf("Queue Stuff:\n"); +// printf(" min_th - %li\n",flow->min_th); +// printf(" max_th - %li\n",flow->max_th); return flow; } @@ -348,7 +352,81 @@ } +/* Set current credit & burst credit */ +static inline void calculate_flow_credit(struct flow_t *flow, unsigned int pktSize) +{ + // Check if we limited by rate + if (flow->maxRate > 0) + { + struct timeval curTime; + // Grab exact time now + gettimeofday(&curTime,NULL); + + // Calculate credit + if (curTime.tv_sec - flow->lastCreditCalc.tv_sec > 0) + { + flow->curCredit = pktSize; + // If we must keep track of burst credit, do the same + if (flow->burstRate > 0) + flow->curBurstCredit = pktSize; + gettimeofday(&flow->lastCreditCalc,NULL); + } + else + { + unsigned int accumCredit; + unsigned int accumBurst; + + + // Add up accumulated time + flow->accumMs = curTime.tv_usec - flow->lastCreditCalc.tv_usec; + // See if we have at lease 1 byte to add + if ((accumCredit = flow->accumMs * flow->usCredit) > 1) + { + // If so add it to max of maxRate + flow->curCredit += accumCredit; + + // Add on burst credit, if we can + if (flow->burstRate > 0) + flow->curBurstCredit += (flow->accumMs * flow->usBurstCredit); + + // Remove the number of microseconds we used + flow->accumMs -= (accumCredit / flow->usCredit); + + // If we exceeded our max's, bring them back down a bit + if (flow->curCredit > flow->maxRate) + { + flow->curCredit = flow->maxRate; + flow->accumMs = 0; + } + if (flow->curBurstCredit > flow->burstRate) + flow->curBurstCredit = flow->burstRate; + + // Set this as the last time we calculated credit + gettimeofday(&flow->lastCreditCalc,NULL); + } + } + } +} + + +// Check if we will exceed our credit +static inline int credit_will_exceed(struct flow_t *flow, unsigned int pktSize) +{ + // Without the typecasts we always get a result of 0 .... *shrug* + return (flow->maxRate > 0 && (long int) flow->curCredit - (long int) pktSize < 0); +} + + +// Check if we will exceed our credit +static inline int burst_credit_will_exceed(struct flow_t *flow, unsigned int pktSize) +{ + // If burstRate == 0, it means we can burst to infinity (limited by parent) + return (flow->burstRate != 0 && (long int) flow->curBurstCredit - (long int) pktSize < 0); +} + + + // Function to process a flow queue, returns number of packets accepted static int processPktQueue(struct runnerData_t *runnerData, struct pktQueue_t *pktQueue) { @@ -390,27 +468,52 @@ // Loop to root while (flow && ok) { - struct timeval timeNow; - float maxCredit = 0, burstCredit = 0; - - - // Grab exact time now - gettimeofday(&timeNow,NULL); + int bursted = 0; + // Set new credits - CALC_MAX_CREDIT(timeNow,flow,maxCredit) - CALC_BURST_CREDIT(timeNow,flow,burstCredit) - + calculate_flow_credit(flow,PKT_SIZE(packet)); + // Check if we have exceeded stuff we shouldn't - if (WILL_CREDIT_EXCEED(flow,packet,maxCredit,burstCredit)) - ok = 0; + if ((i = credit_will_exceed(flow,PKT_SIZE(packet)))) + { + // If we can burst.... see if exceed anything + if (flow->burstRate != -1) + { + int i; + + /* Check if we will exceed our burst credit */ + if ((i = burst_credit_will_exceed(flow,PKT_SIZE(packet)))) + { + ok = 0; + fprintf(stderr,"%s(%i): res = %i, Will exceed for %s (%i:%i:%i:%i)\n",__FILE__,__LINE__, + i,flow->flowName, + flow->maxRate,flow->curCredit,flow->burstRate,flow->curBurstCredit); + } + else + { + // Guess we didn't, we bursted + bursted = 1; + fprintf(stderr,"%s(%i): res = %i, Bursted to parent %s\n",__FILE__,__LINE__,i,flow->flowName); + } + } + else + { + fprintf(stderr,"%s(%i): Will exceed for %s\n",__FILE__,__LINE__,flow->flowName); + ok = 0; + } + } + // If we have a parent, if we do... do a few tests if (ok && flow->parent) { // Check if we havn't exceeded our parent queues - if (WILL_FLOW_QUEUE_EXCEED(flow->parent,packet)) + if (will_exceed_flow_queue(flow->parent,PKT_SIZE(packet))) + { ok = 0; + fprintf(stderr,"%s: Will exceed @ %i\n",flow->flowName,__LINE__); + } /* * if (....) - if we here because of bursting, check if we have a @@ -418,24 +521,45 @@ * 1. We cannot burst over our burst-threshold to the parent * 2. We cannot burst to a parent who is bogged */ - // FIXME: Add a percentage of traffic in parent that is free so we can burst - // FIXME: What about parent burst rates? - if (ok && (maxCredit - PKT_SIZE(packet) < 0 && flow->maxRate > 0)) + if (ok && bursted) { - float parentMax = 0, parentBurst = 0; + double parentCur = 0, parentCurBurst = 0; // Calculate parent credits - CALC_MAX_CREDIT(timeNow,flow->parent, parentMax) - CALC_BURST_CREDIT(timeNow,flow->parent, parentBurst) + calculate_flow_credit(flow->parent,PKT_SIZE(packet)); // Check if we exceeded - if (WILL_CREDIT_EXCEED(flow->parent,packet,parentMax,parentBurst) || - ((flow->parent_th > ((float) parentMax / (float) flow->parent->maxRate) * 100.0) && - flow->parent_th > 0)) - ok = 0; + if (credit_will_exceed(flow->parent,PKT_SIZE(packet)) && + burst_credit_will_exceed(flow->parent,PKT_SIZE(packet))) + ok = 0; + else + { + // Parent threshold stuff, if we havn't exceeded the threshold we can burst + // First check if we can use the parent_th against the burst rate + if (flow->parent->burstRate > 0) + { + if (flow->parent_th > 0 && flow->parent_th < (flow->parent->curThroughput / + (double) flow->parent->burstRate) * 100.0) + { + ok = 0; + fprintf(stderr,"%s: Will exceed @ %i\n",flow->flowName,__LINE__); + } + } + // If not, against the maxRate + else if (flow->parent->maxRate > 0) + { + if (flow->parent_th > 0 && flow->parent_th < (flow->parent->curThroughput / + (double) flow->parent->maxRate) * 100.0) + { + ok = 0; + fprintf(stderr,"%s: Will exceed @ %i\n",flow->flowName,__LINE__); + } + } + } } } + // If packet is still ok to pass through, do our stuff if (ok) { @@ -443,15 +567,47 @@ flow->running.pktCount++; flow->running.pktSize += PKT_SIZE(packet); - flow->curMaxCredit = maxCredit - PKT_SIZE(packet); - flow->curBurstCredit = burstCredit - PKT_SIZE(packet); + flow->curCredit -= PKT_SIZE(packet); + flow->curThroughput += PKT_SIZE(packet); + // If we can burst ... + if (flow->burstRate > 0) + flow->curBurstCredit -= PKT_SIZE(packet); + // We bursted - if (maxCredit < 0 && flow->maxRate != 0) + if (bursted) flow->running.pktBursted++; - // Last time we accepted - gettimeofday(&flow->lastPacketTime,NULL); + // Update our throughput + { + struct timeval curTime; + + // Grab exact time now + gettimeofday(&curTime,NULL); + + + // Add up accumulated time + if (curTime.tv_sec - flow->lastThroughputUpdate.tv_sec > 0) + flow->curThroughputAge += (curTime.tv_sec - flow->lastThroughputUpdate.tv_sec) * 1000000; + else + flow->curThroughputAge += curTime.tv_usec - flow->lastThroughputUpdate.tv_usec; + + // 2 seconds + if (flow->curThroughputAge >= 2000000) + { + flow->curThroughputAge -= 2000000; + flow->curThroughput = flow->curThroughputAge / 1000000.0 * flow->curThroughput; + } + + fprintf(stderr,"%s: throughput: %i/%f\n",flow->flowName,flow->curThroughputAge,flow->curThroughput); + + // Set this as the last time we updated our throughput + gettimeofday(&flow->lastThroughputUpdate,NULL); + + } + + fprintf(stderr,"%s: Current throughput = %f\n",flow->flowName,flow->curThroughput); + g_mutex_unlock(flow->lock); // Time to update our groups Modified: trunk/lib/ipq.c =================================================================== --- trunk/lib/ipq.c 2004-12-28 15:13:50 UTC (rev 7) +++ trunk/lib/ipq.c 2004-12-28 15:19:28 UTC (rev 8) @@ -75,6 +75,14 @@ } +// Check if we will exceed our queue +static inline will_exceed_pkt_queue(struct pktQueue_t *pktQueue, unsigned int pktSize) +{ + return ((pktQueue->curLen >= pktQueue->maxLen && pktQueue->maxLen != 0) || + (pktQueue->curSize + pktSize >= pktQueue->maxSize && pktQueue->maxSize != 0)); +} + + // Queue a packet static int queuePacket(struct runnerData_t *runnerData, long int nfmark, struct packet_t *packet) { @@ -187,8 +195,15 @@ g_mutex_lock(P_FLOW(foundQueue,lock)); // Check first of all if we fucked over our one of our queue limits - if (WILL_QUEUE_EXCEED(foundQueue,packet) || WILL_FLOW_QUEUE_EXCEED(foundQueue->parentFlow,packet)) + if (will_exceed_pkt_queue(foundQueue,PKT_SIZE(packet)) || + will_exceed_flow_queue(foundQueue->parentFlow,PKT_SIZE(packet))) + { drop = 1; + fprintf(stderr,"%s: Will exceed @ %i\n",foundQueue->parentFlow->flowName,__LINE__); + } + + +#if 0 // Check second of all if we fucked our min threshold over else if (TH_EXCEEDED(foundQueue,min_th)) { @@ -205,11 +220,13 @@ drop = 1; } } +#endif - // Check if we must pass the packet if (!drop) { + fprintf(stderr,"%s: Queuing packet @ %i, queue size = %i, queue len = %i\n",foundQueue->parentFlow->flowName,__LINE__, + foundQueue->curSize,foundQueue->curLen); // Lock, queue... adjust stats foundQueue->packets = g_list_append(foundQueue->packets,packet); foundQueue->curSize += PKT_SIZE(packet); |