From: Tuong L. <tuo...@de...> - 2019-08-29 11:36:30
|
We have identified a problem with the "oversubscription" policy in the link transmission code. When small messages are transmitted, and the sending link has reached the transmit window limit, those messages will be bundled and put into the link backlog queue. However, bundles of data messages are counted at the 'CRITICAL' level, so that the counter for that level, instead of the counter for the real, bundled message's level is the one being increased. Subsequent, to-be-bundled data messages at non-CRITICAL levels continue to be tested against the unchanged counter for their own level, while contributing to an unrestrained increase at the CRITICAL backlog level. This leaves a gap in congestion control algorithm for small messages that can result in starvation for other users or a "real" CRITICAL user. Even that eventually can lead to buffer exhaustion & link reset. We fix this by keeping a 'target_bskb' buffer pointer at each levels, then when bundling, we only bundle messages at the same importance level only. This way, we know exactly how many slots a certain level have occupied in the queue, so can manage level congestion accurately. By bundling messages at the same level, we even have more benefits. Let consider this: - One socket sends 64-byte messages at the 'CRITICAL' level; - Another sends 4096-byte messages at the 'LOW' level; When a 64-byte message comes and is bundled the first time, we put the overhead of message bundle to it (+ 40-byte header, data copy, etc.) for later use, but the next message can be a 4096-byte one that cannot be bundled to the previous one. This means the last bundle carries only one payload message which is totally inefficient, as for the receiver also! Later on, another 64-byte message comes, now we make a new bundle and the same story repeats... With the new bundling algorithm, this will not happen, the 64-byte messages will be bundled together even when the 4096-byte message(s) comes in between. However, if the 4096-byte messages are sent at the same level i.e. 'CRITICAL', the bundling algorithm will again cause the same overhead. Also, the same will happen even with only one socket sending small messages at a rate close to the link transmit's one, so that, when one message is bundled, it's transmitted shortly. Then, another message comes, a new bundle is created and so on... We will solve this issue radically by the 2nd patch of this series. Fixes: 365ad353c256 ("tipc: reduce risk of user starvation during link congestion") Reported-by: Hoang Le <hoa...@de...> Acked-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 29 ++++++++++++++++++----------- net/tipc/msg.c | 5 +---- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 6cc75ffd9e2c..999eab592de8 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -160,6 +160,7 @@ struct tipc_link { struct { u16 len; u16 limit; + struct sk_buff *target_bskb; } backlog[5]; u16 snd_nxt; u16 window; @@ -880,6 +881,7 @@ static void link_prepare_wakeup(struct tipc_link *l) void tipc_link_reset(struct tipc_link *l) { struct sk_buff_head list; + u32 imp; __skb_queue_head_init(&list); @@ -901,11 +903,10 @@ void tipc_link_reset(struct tipc_link *l) __skb_queue_purge(&l->deferdq); __skb_queue_purge(&l->backlogq); __skb_queue_purge(&l->failover_deferdq); - l->backlog[TIPC_LOW_IMPORTANCE].len = 0; - l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; - l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; - l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; - l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; + for (imp = 0; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) { + l->backlog[imp].len = 0; + l->backlog[imp].target_bskb = NULL; + } kfree_skb(l->reasm_buf); kfree_skb(l->reasm_tnlmsg); kfree_skb(l->failover_reasm_skb); @@ -947,7 +948,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; struct sk_buff_head *transmq = &l->transmq; struct sk_buff_head *backlogq = &l->backlogq; - struct sk_buff *skb, *_skb, *bskb; + struct sk_buff *skb, *_skb, **tskb; int pkt_cnt = skb_queue_len(list); int rc = 0; @@ -999,19 +1000,21 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, seqno++; continue; } - if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { + tskb = &l->backlog[imp].target_bskb; + if (tipc_msg_bundle(*tskb, hdr, mtu)) { kfree_skb(__skb_dequeue(list)); l->stats.sent_bundled++; continue; } - if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { + if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) { kfree_skb(__skb_dequeue(list)); - __skb_queue_tail(backlogq, bskb); - l->backlog[msg_importance(buf_msg(bskb))].len++; + __skb_queue_tail(backlogq, *tskb); + l->backlog[imp].len++; l->stats.sent_bundled++; l->stats.sent_bundles++; continue; } + l->backlog[imp].target_bskb = NULL; l->backlog[imp].len += skb_queue_len(list); skb_queue_splice_tail_init(list, backlogq); } @@ -1027,6 +1030,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l, u16 seqno = l->snd_nxt; u16 ack = l->rcv_nxt - 1; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; + u32 imp; while (skb_queue_len(&l->transmq) < l->window) { skb = skb_peek(&l->backlogq); @@ -1037,7 +1041,10 @@ static void tipc_link_advance_backlog(struct tipc_link *l, break; __skb_dequeue(&l->backlogq); hdr = buf_msg(skb); - l->backlog[msg_importance(hdr)].len--; + imp = msg_importance(hdr); + l->backlog[imp].len--; + if (unlikely(skb == l->backlog[imp].target_bskb)) + l->backlog[imp].target_bskb = NULL; __skb_queue_tail(&l->transmq, skb); /* next retransmit attempt */ if (link_is_bc_sndlink(l)) diff --git a/net/tipc/msg.c b/net/tipc/msg.c index e6d49cdc61b4..922d262e153f 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -543,10 +543,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, bmsg = buf_msg(_skb); tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode); - if (msg_isdata(msg)) - msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE); - else - msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE); + msg_set_importance(bmsg, msg_importance(msg)); msg_set_seqno(bmsg, msg_seqno(msg)); msg_set_ack(bmsg, msg_ack(msg)); msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-08-29 11:36:35
|
As mentioned in the previous commit of this series, the current message bundling algorithm is inefficient that can generate bundles of only one payload message, that causes unnecessary overheads for both the sender and receiver. This commit re-designs the 'tipc_msg_make_bundle()' function (now named as 'tipc_msg_try_bundle()'), so that when a message comes at the first place, we will check if it is suitable for bundling or not. If yes, we keep a reference to it and do nothing i.e. the message buffer will be put into the link backlog queue and processed as normal. Later on, when another one comes, we will make a bundle with the first message if possible and so on... This way, a bundle if really needed will always consist of at least two payload messages. Otherwise, we let the first buffer go its way without any need of bundling, so reduce the overheads to zero. Moreover, since now we have both the messages in hand, we can even optimize the 'tipc_msg_bundle()' function, make bundle of a very large (size ~ MSS) and small messages which is not with the current algorithm, e.g. [1400-byte message] + [10-byte message] (MTU = 1500). Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 45 ++++++++------- net/tipc/msg.c | 167 ++++++++++++++++++++++++++++++-------------------------- net/tipc/msg.h | 5 +- 3 files changed, 117 insertions(+), 100 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 999eab592de8..ff5980fc56dc 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -948,9 +948,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; struct sk_buff_head *transmq = &l->transmq; struct sk_buff_head *backlogq = &l->backlogq; - struct sk_buff *skb, *_skb, **tskb; + struct sk_buff *skb, *_skb; int pkt_cnt = skb_queue_len(list); int rc = 0; + u16 n; if (unlikely(msg_size(hdr) > mtu)) { pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n", @@ -975,20 +976,18 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, } /* Prepare each packet for sending, and add to relevant queue: */ - while (skb_queue_len(list)) { - skb = skb_peek(list); - hdr = buf_msg(skb); - msg_set_seqno(hdr, seqno); - msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_ack); - + while ((skb = __skb_dequeue(list))) { if (likely(skb_queue_len(transmq) < maxwin)) { + hdr = buf_msg(skb); + msg_set_seqno(hdr, seqno); + msg_set_ack(hdr, ack); + msg_set_bcast_ack(hdr, bc_ack); _skb = skb_clone(skb, GFP_ATOMIC); if (!_skb) { + kfree_skb(skb); __skb_queue_purge(list); return -ENOBUFS; } - __skb_dequeue(list); __skb_queue_tail(transmq, skb); /* next retransmit attempt */ if (link_is_bc_sndlink(l)) @@ -1000,22 +999,26 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, seqno++; continue; } - tskb = &l->backlog[imp].target_bskb; - if (tipc_msg_bundle(*tskb, hdr, mtu)) { - kfree_skb(__skb_dequeue(list)); - l->stats.sent_bundled++; - continue; - } - if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) { - kfree_skb(__skb_dequeue(list)); - __skb_queue_tail(backlogq, *tskb); + n = tipc_msg_try_bundle(&l->backlog[imp].target_bskb, skb, + mtu - INT_H_SIZE, + l->addr); + switch (n) { + case 0: + break; + case 1: + __skb_queue_tail(backlogq, skb); l->backlog[imp].len++; - l->stats.sent_bundled++; + continue; + case 2: l->stats.sent_bundles++; + l->stats.sent_bundled++; + default: + kfree_skb(skb); + l->stats.sent_bundled++; continue; } - l->backlog[imp].target_bskb = NULL; - l->backlog[imp].len += skb_queue_len(list); + l->backlog[imp].len += (1 + skb_queue_len(list)); + __skb_queue_tail(backlogq, skb); skb_queue_splice_tail_init(list, backlogq); } l->snd_nxt = seqno; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 922d262e153f..768e29d6599e 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -419,52 +419,110 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, } /** - * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one - * @skb: the buffer to append to ("bundle") - * @msg: message to be appended - * @mtu: max allowable size for the bundle buffer - * Consumes buffer if successful - * Returns true if bundling could be performed, otherwise false + * tipc_msg_bundle - Append contents of a buffer to tail of an existing one + * @bskb: the bundle buffer to append to + * @msg: message to be appended + * @max: max allowable size for the bundle buffer + * + * Returns "true" if bundling has been performed, otherwise "false" */ -bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu) +static bool tipc_msg_bundle(struct sk_buff *bskb, struct tipc_msg *msg, + u32 max) { - struct tipc_msg *bmsg; - unsigned int bsz; - unsigned int msz = msg_size(msg); - u32 start, pad; - u32 max = mtu - INT_H_SIZE; + struct tipc_msg *bmsg = buf_msg(bskb); + u32 msz, bsz, offset, pad; - if (likely(msg_user(msg) == MSG_FRAGMENTER)) - return false; - if (!skb) - return false; - bmsg = buf_msg(skb); + msz = msg_size(msg); bsz = msg_size(bmsg); - start = align(bsz); - pad = start - bsz; + offset = align(bsz); + pad = offset - bsz; - if (unlikely(msg_user(msg) == TUNNEL_PROTOCOL)) - return false; - if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) + if (unlikely(skb_tailroom(bskb) < (pad + msz))) return false; - if (unlikely(msg_user(bmsg) != MSG_BUNDLER)) - return false; - if (unlikely(skb_tailroom(skb) < (pad + msz))) - return false; - if (unlikely(max < (start + msz))) - return false; - if ((msg_importance(msg) < TIPC_SYSTEM_IMPORTANCE) && - (msg_importance(bmsg) == TIPC_SYSTEM_IMPORTANCE)) + if (unlikely(max < (offset + msz))) return false; - skb_put(skb, pad + msz); - skb_copy_to_linear_data_offset(skb, start, msg, msz); - msg_set_size(bmsg, start + msz); + skb_put(bskb, pad + msz); + skb_copy_to_linear_data_offset(bskb, offset, msg, msz); + msg_set_size(bmsg, offset + msz); msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); return true; } /** + * tipc_msg_try_bundle - Try to bundle a new buffer to the last bundle or + * previously potential message if any + * @tskb: target buffer to bundle to (= a bundle or normal buffer or NULL) + * @skb: the new buffer to be bundled + * @mss: max message size (header inclusive) + * @dnode: destination node for the buffer + * + * Returns the number of bundled messages in the bundle if bundling has been + * performed (must be > 1). In case the new buffer is suitable for bundling + * but cannot be bundled this time, it becomes the new target for next time, + * returns 1! + * Otherwise, returns 0 shortly and the current target is flushed too. + */ +u16 tipc_msg_try_bundle(struct sk_buff **tskb, struct sk_buff *skb, u32 mss, + u32 dnode) +{ + struct tipc_msg *msg = buf_msg(skb); + struct tipc_msg *bmsg, *omsg; + u32 bsz, msz = msg_size(msg); + + /* First, check if the new buffer is suitable for bundling */ + if (msg_user(msg) == MSG_FRAGMENTER) + goto flush; + if (msg_user(msg) == TUNNEL_PROTOCOL) + goto flush; + if (msg_user(msg) == BCAST_PROTOCOL) + goto flush; + if (mss <= INT_H_SIZE + msz) + goto flush; + + /* Ok, but target lost, make a new one */ + if (unlikely(!*tskb)) + goto new_target; + + bmsg = buf_msg(*tskb); + bsz = msg_size(bmsg); + + /* If target is a bundle already, try to bundle the new buffer to */ + if (msg_user(bmsg) == MSG_BUNDLER) + goto bundle; + + /* Target is not a bundle (i.e. the previously potential buffer), make + * a new bundle of the two buffers if possible + */ + if (unlikely(mss < align(INT_H_SIZE + bsz) + msz)) + goto new_target; + if (unlikely(pskb_expand_head(*tskb, INT_H_SIZE, mss - bsz, + GFP_ATOMIC))) + goto new_target; + + omsg = buf_msg(*tskb); + skb_push(*tskb, INT_H_SIZE); + bmsg = buf_msg(*tskb); + tipc_msg_init(msg_prevnode(omsg), bmsg, MSG_BUNDLER, 0, INT_H_SIZE, + dnode); + msg_set_importance(bmsg, msg_importance(omsg)); + msg_set_size(bmsg, INT_H_SIZE + bsz); + msg_set_msgcnt(bmsg, 1); + +bundle: + if (likely(tipc_msg_bundle(*tskb, msg, mss))) + return msg_msgcnt(bmsg); + +new_target: + *tskb = skb; + return 1; + +flush: + *tskb = NULL; + return 0; +} + +/** * tipc_msg_extract(): extract bundled inner packet from buffer * @skb: buffer to be extracted from. * @iskb: extracted inner buffer, to be returned @@ -510,49 +568,6 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos) } /** - * tipc_msg_make_bundle(): Create bundle buf and append message to its tail - * @list: the buffer chain, where head is the buffer to replace/append - * @skb: buffer to be created, appended to and returned in case of success - * @msg: message to be appended - * @mtu: max allowable size for the bundle buffer, inclusive header - * @dnode: destination node for message. (Not always present in header) - * Returns true if success, otherwise false - */ -bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, - u32 mtu, u32 dnode) -{ - struct sk_buff *_skb; - struct tipc_msg *bmsg; - u32 msz = msg_size(msg); - u32 max = mtu - INT_H_SIZE; - - if (msg_user(msg) == MSG_FRAGMENTER) - return false; - if (msg_user(msg) == TUNNEL_PROTOCOL) - return false; - if (msg_user(msg) == BCAST_PROTOCOL) - return false; - if (msz > (max / 2)) - return false; - - _skb = tipc_buf_acquire(max, GFP_ATOMIC); - if (!_skb) - return false; - - skb_trim(_skb, INT_H_SIZE); - bmsg = buf_msg(_skb); - tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, - INT_H_SIZE, dnode); - msg_set_importance(bmsg, msg_importance(msg)); - msg_set_seqno(bmsg, msg_seqno(msg)); - msg_set_ack(bmsg, msg_ack(msg)); - msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); - tipc_msg_bundle(_skb, msg, mtu); - *skb = _skb; - return true; -} - -/** * tipc_msg_reverse(): swap source and destination addresses and add error code * @own_node: originating node id for reversed message * @skb: buffer containing message to be reversed; will be consumed diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 0daa6f04ca81..281fcda55311 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1057,9 +1057,8 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz, uint data_sz, u32 dnode, u32 onode, u32 dport, u32 oport, int errcode); int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); -bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu); -bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, - u32 mtu, u32 dnode); +u16 tipc_msg_try_bundle(struct sk_buff **tskb, struct sk_buff *skb, u32 mss, + u32 dnode); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, int pktmax, struct sk_buff_head *frags); -- 2.13.7 |
From: Jon M. <jon...@er...> - 2019-08-30 20:56:51
|
Hi Tuong, Both patches are good with me. Unfortunately I could not register any measurable performance improvement, but I still think this is the right thing to do. Acked-by: Jon > -----Original Message----- > From: Tuong Lien <tuo...@de...> > Sent: 29-Aug-19 07:36 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [PATCH RFC 1/2] tipc: fix unlimited bundling of small messages > > We have identified a problem with the "oversubscription" policy in the link > transmission code. > > When small messages are transmitted, and the sending link has reached the > transmit window limit, those messages will be bundled and put into the link > backlog queue. However, bundles of data messages are counted at the > 'CRITICAL' level, so that the counter for that level, instead of the counter for > the real, bundled message's level is the one being increased. > Subsequent, to-be-bundled data messages at non-CRITICAL levels continue to > be tested against the unchanged counter for their own level, while > contributing to an unrestrained increase at the CRITICAL backlog level. > > This leaves a gap in congestion control algorithm for small messages that can > result in starvation for other users or a "real" CRITICAL user. Even that > eventually can lead to buffer exhaustion & link reset. > > We fix this by keeping a 'target_bskb' buffer pointer at each levels, then when > bundling, we only bundle messages at the same importance level only. This > way, we know exactly how many slots a certain level have occupied in the > queue, so can manage level congestion accurately. > > By bundling messages at the same level, we even have more benefits. Let > consider this: > - One socket sends 64-byte messages at the 'CRITICAL' level; > - Another sends 4096-byte messages at the 'LOW' level; > > When a 64-byte message comes and is bundled the first time, we put the > overhead of message bundle to it (+ 40-byte header, data copy, etc.) for later > use, but the next message can be a 4096-byte one that cannot be bundled to > the previous one. This means the last bundle carries only one payload message > which is totally inefficient, as for the receiver also! Later on, another 64-byte > message comes, now we make a new bundle and the same story repeats... > > With the new bundling algorithm, this will not happen, the 64-byte messages > will be bundled together even when the 4096-byte message(s) comes in > between. However, if the 4096-byte messages are sent at the same level i.e. > 'CRITICAL', the bundling algorithm will again cause the same overhead. > > Also, the same will happen even with only one socket sending small messages > at a rate close to the link transmit's one, so that, when one message is > bundled, it's transmitted shortly. Then, another message comes, a new bundle > is created and so on... > > We will solve this issue radically by the 2nd patch of this series. > > Fixes: 365ad353c256 ("tipc: reduce risk of user starvation during link > congestion") > Reported-by: Hoang Le <hoa...@de...> > Acked-by: Jon Maloy <jon...@er...> > Signed-off-by: Tuong Lien <tuo...@de...> > --- > net/tipc/link.c | 29 ++++++++++++++++++----------- net/tipc/msg.c | 5 +---- > 2 files changed, 19 insertions(+), 15 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c index 6cc75ffd9e2c..999eab592de8 > 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -160,6 +160,7 @@ struct tipc_link { > struct { > u16 len; > u16 limit; > + struct sk_buff *target_bskb; > } backlog[5]; > u16 snd_nxt; > u16 window; > @@ -880,6 +881,7 @@ static void link_prepare_wakeup(struct tipc_link *l) > void tipc_link_reset(struct tipc_link *l) { > struct sk_buff_head list; > + u32 imp; > > __skb_queue_head_init(&list); > > @@ -901,11 +903,10 @@ void tipc_link_reset(struct tipc_link *l) > __skb_queue_purge(&l->deferdq); > __skb_queue_purge(&l->backlogq); > __skb_queue_purge(&l->failover_deferdq); > - l->backlog[TIPC_LOW_IMPORTANCE].len = 0; > - l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; > - l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; > - l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; > - l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; > + for (imp = 0; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) { > + l->backlog[imp].len = 0; > + l->backlog[imp].target_bskb = NULL; > + } > kfree_skb(l->reasm_buf); > kfree_skb(l->reasm_tnlmsg); > kfree_skb(l->failover_reasm_skb); > @@ -947,7 +948,7 @@ int tipc_link_xmit(struct tipc_link *l, struct > sk_buff_head *list, > u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; > struct sk_buff_head *transmq = &l->transmq; > struct sk_buff_head *backlogq = &l->backlogq; > - struct sk_buff *skb, *_skb, *bskb; > + struct sk_buff *skb, *_skb, **tskb; > int pkt_cnt = skb_queue_len(list); > int rc = 0; > > @@ -999,19 +1000,21 @@ int tipc_link_xmit(struct tipc_link *l, struct > sk_buff_head *list, > seqno++; > continue; > } > - if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { > + tskb = &l->backlog[imp].target_bskb; > + if (tipc_msg_bundle(*tskb, hdr, mtu)) { > kfree_skb(__skb_dequeue(list)); > l->stats.sent_bundled++; > continue; > } > - if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { > + if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) { > kfree_skb(__skb_dequeue(list)); > - __skb_queue_tail(backlogq, bskb); > - l->backlog[msg_importance(buf_msg(bskb))].len++; > + __skb_queue_tail(backlogq, *tskb); > + l->backlog[imp].len++; > l->stats.sent_bundled++; > l->stats.sent_bundles++; > continue; > } > + l->backlog[imp].target_bskb = NULL; > l->backlog[imp].len += skb_queue_len(list); > skb_queue_splice_tail_init(list, backlogq); > } > @@ -1027,6 +1030,7 @@ static void tipc_link_advance_backlog(struct > tipc_link *l, > u16 seqno = l->snd_nxt; > u16 ack = l->rcv_nxt - 1; > u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; > + u32 imp; > > while (skb_queue_len(&l->transmq) < l->window) { > skb = skb_peek(&l->backlogq); > @@ -1037,7 +1041,10 @@ static void tipc_link_advance_backlog(struct > tipc_link *l, > break; > __skb_dequeue(&l->backlogq); > hdr = buf_msg(skb); > - l->backlog[msg_importance(hdr)].len--; > + imp = msg_importance(hdr); > + l->backlog[imp].len--; > + if (unlikely(skb == l->backlog[imp].target_bskb)) > + l->backlog[imp].target_bskb = NULL; > __skb_queue_tail(&l->transmq, skb); > /* next retransmit attempt */ > if (link_is_bc_sndlink(l)) > diff --git a/net/tipc/msg.c b/net/tipc/msg.c index > e6d49cdc61b4..922d262e153f 100644 > --- a/net/tipc/msg.c > +++ b/net/tipc/msg.c > @@ -543,10 +543,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, > struct tipc_msg *msg, > bmsg = buf_msg(_skb); > tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, > INT_H_SIZE, dnode); > - if (msg_isdata(msg)) > - msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE); > - else > - msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE); > + msg_set_importance(bmsg, msg_importance(msg)); > msg_set_seqno(bmsg, msg_seqno(msg)); > msg_set_ack(bmsg, msg_ack(msg)); > msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); > -- > 2.13.7 |
From: Tuong L. T. <tuo...@de...> - 2019-09-10 02:55:02
|
Hi Jon, Agree, the patch doesn't show an improvement in throughput. However, I believe this is due to the fact our test tool e.g. the benchmark just runs tests of the same messages size in each cases. In practice, when there are different sized messages sent by applications at the same time, the new bundling strategy will take effect. By the way, since now we limit exactly the number of packets in the backlog at each levels, this obviously reduces the throughput of small messages (as bundles), but I wonder why we need to set the limits quite small? When trying to set a larger value, I have observed a significant improvement in the throughputs, for large messages as well. Our current approach at the link layer doesn't seem very good as the limit is fixed without considering the actual number of users or connections... BR/Tuong -----Original Message----- From: Jon Maloy <jon...@er...> Sent: Saturday, August 31, 2019 3:57 AM To: Tuong Tong Lien <tuo...@de...>; tip...@li...; ma...@do...; yin...@wi... Subject: RE: [PATCH RFC 1/2] tipc: fix unlimited bundling of small messages Hi Tuong, Both patches are good with me. Unfortunately I could not register any measurable performance improvement, but I still think this is the right thing to do. Acked-by: Jon > -----Original Message----- > From: Tuong Lien <tuo...@de...> > Sent: 29-Aug-19 07:36 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [PATCH RFC 1/2] tipc: fix unlimited bundling of small messages > > We have identified a problem with the "oversubscription" policy in the link > transmission code. > > When small messages are transmitted, and the sending link has reached the > transmit window limit, those messages will be bundled and put into the link > backlog queue. However, bundles of data messages are counted at the > 'CRITICAL' level, so that the counter for that level, instead of the counter for > the real, bundled message's level is the one being increased. > Subsequent, to-be-bundled data messages at non-CRITICAL levels continue to > be tested against the unchanged counter for their own level, while > contributing to an unrestrained increase at the CRITICAL backlog level. > > This leaves a gap in congestion control algorithm for small messages that can > result in starvation for other users or a "real" CRITICAL user. Even that > eventually can lead to buffer exhaustion & link reset. > > We fix this by keeping a 'target_bskb' buffer pointer at each levels, then when > bundling, we only bundle messages at the same importance level only. This > way, we know exactly how many slots a certain level have occupied in the > queue, so can manage level congestion accurately. > > By bundling messages at the same level, we even have more benefits. Let > consider this: > - One socket sends 64-byte messages at the 'CRITICAL' level; > - Another sends 4096-byte messages at the 'LOW' level; > > When a 64-byte message comes and is bundled the first time, we put the > overhead of message bundle to it (+ 40-byte header, data copy, etc.) for later > use, but the next message can be a 4096-byte one that cannot be bundled to > the previous one. This means the last bundle carries only one payload message > which is totally inefficient, as for the receiver also! Later on, another 64-byte > message comes, now we make a new bundle and the same story repeats... > > With the new bundling algorithm, this will not happen, the 64-byte messages > will be bundled together even when the 4096-byte message(s) comes in > between. However, if the 4096-byte messages are sent at the same level i.e. > 'CRITICAL', the bundling algorithm will again cause the same overhead. > > Also, the same will happen even with only one socket sending small messages > at a rate close to the link transmit's one, so that, when one message is > bundled, it's transmitted shortly. Then, another message comes, a new bundle > is created and so on... > > We will solve this issue radically by the 2nd patch of this series. > > Fixes: 365ad353c256 ("tipc: reduce risk of user starvation during link > congestion") > Reported-by: Hoang Le <hoa...@de...> > Acked-by: Jon Maloy <jon...@er...> > Signed-off-by: Tuong Lien <tuo...@de...> > --- > net/tipc/link.c | 29 ++++++++++++++++++----------- net/tipc/msg.c | 5 +---- > 2 files changed, 19 insertions(+), 15 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c index 6cc75ffd9e2c..999eab592de8 > 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -160,6 +160,7 @@ struct tipc_link { > struct { > u16 len; > u16 limit; > + struct sk_buff *target_bskb; > } backlog[5]; > u16 snd_nxt; > u16 window; > @@ -880,6 +881,7 @@ static void link_prepare_wakeup(struct tipc_link *l) > void tipc_link_reset(struct tipc_link *l) { > struct sk_buff_head list; > + u32 imp; > > __skb_queue_head_init(&list); > > @@ -901,11 +903,10 @@ void tipc_link_reset(struct tipc_link *l) > __skb_queue_purge(&l->deferdq); > __skb_queue_purge(&l->backlogq); > __skb_queue_purge(&l->failover_deferdq); > - l->backlog[TIPC_LOW_IMPORTANCE].len = 0; > - l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; > - l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; > - l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; > - l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; > + for (imp = 0; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) { > + l->backlog[imp].len = 0; > + l->backlog[imp].target_bskb = NULL; > + } > kfree_skb(l->reasm_buf); > kfree_skb(l->reasm_tnlmsg); > kfree_skb(l->failover_reasm_skb); > @@ -947,7 +948,7 @@ int tipc_link_xmit(struct tipc_link *l, struct > sk_buff_head *list, > u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; > struct sk_buff_head *transmq = &l->transmq; > struct sk_buff_head *backlogq = &l->backlogq; > - struct sk_buff *skb, *_skb, *bskb; > + struct sk_buff *skb, *_skb, **tskb; > int pkt_cnt = skb_queue_len(list); > int rc = 0; > > @@ -999,19 +1000,21 @@ int tipc_link_xmit(struct tipc_link *l, struct > sk_buff_head *list, > seqno++; > continue; > } > - if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) { > + tskb = &l->backlog[imp].target_bskb; > + if (tipc_msg_bundle(*tskb, hdr, mtu)) { > kfree_skb(__skb_dequeue(list)); > l->stats.sent_bundled++; > continue; > } > - if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) { > + if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) { > kfree_skb(__skb_dequeue(list)); > - __skb_queue_tail(backlogq, bskb); > - l->backlog[msg_importance(buf_msg(bskb))].len++; > + __skb_queue_tail(backlogq, *tskb); > + l->backlog[imp].len++; > l->stats.sent_bundled++; > l->stats.sent_bundles++; > continue; > } > + l->backlog[imp].target_bskb = NULL; > l->backlog[imp].len += skb_queue_len(list); > skb_queue_splice_tail_init(list, backlogq); > } > @@ -1027,6 +1030,7 @@ static void tipc_link_advance_backlog(struct > tipc_link *l, > u16 seqno = l->snd_nxt; > u16 ack = l->rcv_nxt - 1; > u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; > + u32 imp; > > while (skb_queue_len(&l->transmq) < l->window) { > skb = skb_peek(&l->backlogq); > @@ -1037,7 +1041,10 @@ static void tipc_link_advance_backlog(struct > tipc_link *l, > break; > __skb_dequeue(&l->backlogq); > hdr = buf_msg(skb); > - l->backlog[msg_importance(hdr)].len--; > + imp = msg_importance(hdr); > + l->backlog[imp].len--; > + if (unlikely(skb == l->backlog[imp].target_bskb)) > + l->backlog[imp].target_bskb = NULL; > __skb_queue_tail(&l->transmq, skb); > /* next retransmit attempt */ > if (link_is_bc_sndlink(l)) > diff --git a/net/tipc/msg.c b/net/tipc/msg.c index > e6d49cdc61b4..922d262e153f 100644 > --- a/net/tipc/msg.c > +++ b/net/tipc/msg.c > @@ -543,10 +543,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, > struct tipc_msg *msg, > bmsg = buf_msg(_skb); > tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0, > INT_H_SIZE, dnode); > - if (msg_isdata(msg)) > - msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE); > - else > - msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE); > + msg_set_importance(bmsg, msg_importance(msg)); > msg_set_seqno(bmsg, msg_seqno(msg)); > msg_set_ack(bmsg, msg_ack(msg)); > msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); > -- > 2.13.7 |
From: Xue, Y. <Yin...@wi...> - 2019-10-11 19:00:27
|
I can recognize this is a good improvement except that the following switch cases of return values of tipc_msg_try_bundle() are not very friendly for code reader. Although I do understand their real meanings, I have to spend time checking its context back and forth. At least we should the meaningless hard code case numbers or we try to change return value numbers of tipc_msg_try_bundle(). + n = tipc_msg_try_bundle(&l->backlog[imp].target_bskb, skb, + mtu - INT_H_SIZE, + l->addr); + switch (n) { + case 0: + break; + case 1: + __skb_queue_tail(backlogq, skb); l->backlog[imp].len++; - l->stats.sent_bundled++; + continue; + case 2: l->stats.sent_bundles++; + l->stats.sent_bundled++; + default: + kfree_skb(skb); + l->stats.sent_bundled++; continue; |
From: Tuong L. T. <tuo...@de...> - 2019-10-15 04:49:55
|
Hi Ying, Agree, it's hard to trace... I've changed the way we approach, will post it as a new patch, please take a look from there! Thanks a lot! BR/Tuong -----Original Message----- From: Xue, Ying <Yin...@wi...> Sent: Friday, October 11, 2019 9:52 PM To: Tuong Lien <tuo...@de...>; tip...@li...; jon...@er...; ma...@do... Subject: RE: [PATCH RFC 2/2] tipc: improve message bundling algorithm I can recognize this is a good improvement except that the following switch cases of return values of tipc_msg_try_bundle() are not very friendly for code reader. Although I do understand their real meanings, I have to spend time checking its context back and forth. At least we should the meaningless hard code case numbers or we try to change return value numbers of tipc_msg_try_bundle(). + n = tipc_msg_try_bundle(&l->backlog[imp].target_bskb, skb, + mtu - INT_H_SIZE, + l->addr); + switch (n) { + case 0: + break; + case 1: + __skb_queue_tail(backlogq, skb); l->backlog[imp].len++; - l->stats.sent_bundled++; + continue; + case 2: l->stats.sent_bundles++; + l->stats.sent_bundled++; + default: + kfree_skb(skb); + l->stats.sent_bundled++; continue; |