You can subscribe to this list here.
2003 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(6) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2004 |
Jan
(9) |
Feb
(11) |
Mar
(22) |
Apr
(73) |
May
(78) |
Jun
(146) |
Jul
(80) |
Aug
(27) |
Sep
(5) |
Oct
(14) |
Nov
(18) |
Dec
(27) |
2005 |
Jan
(20) |
Feb
(30) |
Mar
(19) |
Apr
(28) |
May
(50) |
Jun
(31) |
Jul
(32) |
Aug
(14) |
Sep
(36) |
Oct
(43) |
Nov
(74) |
Dec
(63) |
2006 |
Jan
(34) |
Feb
(32) |
Mar
(21) |
Apr
(76) |
May
(106) |
Jun
(72) |
Jul
(70) |
Aug
(175) |
Sep
(130) |
Oct
(39) |
Nov
(81) |
Dec
(43) |
2007 |
Jan
(81) |
Feb
(36) |
Mar
(20) |
Apr
(43) |
May
(54) |
Jun
(34) |
Jul
(44) |
Aug
(55) |
Sep
(44) |
Oct
(54) |
Nov
(43) |
Dec
(41) |
2008 |
Jan
(42) |
Feb
(84) |
Mar
(73) |
Apr
(30) |
May
(119) |
Jun
(54) |
Jul
(54) |
Aug
(93) |
Sep
(173) |
Oct
(130) |
Nov
(145) |
Dec
(153) |
2009 |
Jan
(59) |
Feb
(12) |
Mar
(28) |
Apr
(18) |
May
(56) |
Jun
(9) |
Jul
(28) |
Aug
(62) |
Sep
(16) |
Oct
(19) |
Nov
(15) |
Dec
(17) |
2010 |
Jan
(14) |
Feb
(36) |
Mar
(37) |
Apr
(30) |
May
(33) |
Jun
(53) |
Jul
(42) |
Aug
(50) |
Sep
(67) |
Oct
(66) |
Nov
(69) |
Dec
(36) |
2011 |
Jan
(52) |
Feb
(45) |
Mar
(49) |
Apr
(21) |
May
(34) |
Jun
(13) |
Jul
(19) |
Aug
(37) |
Sep
(43) |
Oct
(10) |
Nov
(23) |
Dec
(30) |
2012 |
Jan
(42) |
Feb
(36) |
Mar
(46) |
Apr
(25) |
May
(96) |
Jun
(146) |
Jul
(40) |
Aug
(28) |
Sep
(61) |
Oct
(45) |
Nov
(100) |
Dec
(53) |
2013 |
Jan
(79) |
Feb
(24) |
Mar
(134) |
Apr
(156) |
May
(118) |
Jun
(75) |
Jul
(278) |
Aug
(145) |
Sep
(136) |
Oct
(168) |
Nov
(137) |
Dec
(439) |
2014 |
Jan
(284) |
Feb
(158) |
Mar
(231) |
Apr
(275) |
May
(259) |
Jun
(91) |
Jul
(222) |
Aug
(215) |
Sep
(165) |
Oct
(166) |
Nov
(211) |
Dec
(150) |
2015 |
Jan
(164) |
Feb
(324) |
Mar
(299) |
Apr
(214) |
May
(111) |
Jun
(109) |
Jul
(105) |
Aug
(36) |
Sep
(58) |
Oct
(131) |
Nov
(68) |
Dec
(30) |
2016 |
Jan
(46) |
Feb
(87) |
Mar
(135) |
Apr
(174) |
May
(132) |
Jun
(135) |
Jul
(149) |
Aug
(125) |
Sep
(79) |
Oct
(49) |
Nov
(95) |
Dec
(102) |
2017 |
Jan
(104) |
Feb
(75) |
Mar
(72) |
Apr
(53) |
May
(18) |
Jun
(5) |
Jul
(14) |
Aug
(19) |
Sep
(2) |
Oct
(13) |
Nov
(21) |
Dec
(67) |
2018 |
Jan
(56) |
Feb
(50) |
Mar
(148) |
Apr
(41) |
May
(37) |
Jun
(34) |
Jul
(34) |
Aug
(11) |
Sep
(52) |
Oct
(48) |
Nov
(28) |
Dec
(46) |
2019 |
Jan
(29) |
Feb
(63) |
Mar
(95) |
Apr
(54) |
May
(14) |
Jun
(71) |
Jul
(60) |
Aug
(49) |
Sep
(3) |
Oct
(64) |
Nov
(115) |
Dec
(57) |
2020 |
Jan
(15) |
Feb
(9) |
Mar
(38) |
Apr
(27) |
May
(60) |
Jun
(53) |
Jul
(35) |
Aug
(46) |
Sep
(37) |
Oct
(64) |
Nov
(20) |
Dec
(25) |
2021 |
Jan
(20) |
Feb
(31) |
Mar
(27) |
Apr
(23) |
May
(21) |
Jun
(30) |
Jul
(30) |
Aug
(7) |
Sep
(18) |
Oct
|
Nov
(15) |
Dec
(4) |
2022 |
Jan
(3) |
Feb
(1) |
Mar
(10) |
Apr
|
May
(2) |
Jun
(26) |
Jul
(5) |
Aug
|
Sep
(1) |
Oct
(2) |
Nov
(9) |
Dec
(2) |
2023 |
Jan
(4) |
Feb
(4) |
Mar
(5) |
Apr
(10) |
May
(29) |
Jun
(17) |
Jul
|
Aug
|
Sep
(1) |
Oct
(1) |
Nov
(2) |
Dec
|
2024 |
Jan
|
Feb
(6) |
Mar
|
Apr
(1) |
May
(6) |
Jun
|
Jul
(5) |
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2025 |
Jan
|
Feb
(3) |
Mar
|
Apr
|
May
|
Jun
|
Jul
(6) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <zh...@wi...> - 2019-07-23 09:19:46
|
From: He Zhe <zh...@wi...> Flooding multicast may make the rcv buffer overrun and is considered premature messages later and thus cause the following error. "Ignoring premature msg 16, currently handling 12" This patch sets SO_RCVBUF the of socket to max int value to receive as many packets as possible, and give a hint to user when possible overrun occurs. Note that the value of SO_RCVBUF will be limited up to min(INT_MAX/2, sysctl_rmem_max) in kernel. Signed-off-by: He Zhe <zh...@wi...> --- test/ptts/tipc_ts_server.c | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/test/ptts/tipc_ts_server.c b/test/ptts/tipc_ts_server.c index a286daa..cc69e6e 100644 --- a/test/ptts/tipc_ts_server.c +++ b/test/ptts/tipc_ts_server.c @@ -641,8 +641,9 @@ void server_mcast if (rc < 0) err("multicast message not received"); if (msgno != *(int*) buf) { - dbg1("Ignoring premature msg %u, currently handling %u\n", - *(int*)buf, msgno); + dbg1("Ignoring premature msg %u, currently handling %u\n" + "You can enlarge /proc/sys/net/core/rmem_max and try again\n", + *(int*)buf, msgno); continue; } rc = recvfrom(sd[i], buf, expected_szs[numSubTest], @@ -687,8 +688,21 @@ void server_test_multicast(void) FD_ZERO(&readfds); for (i = 0; i < TIPC_MCAST_SOCKETS; i++) { + int optval = (int)(~0U >> 1); + socklen_t optlen = sizeof(optval); + int rc = 0; + sd[i] = createSocketTIPC (SOCK_RDM); FD_SET(sd[i], &readfds); + + /* + * Flooding multicast may make the rcv buffer overrun and is considered premature msg later. + * Set SO_RCVBUF to max int value to receive as many as possible. + * Note that it will be limited up to min(INT_MAX/2, sysctl_rmem_max) in kernel. + */ + rc = setsockopt(sd[i], SOL_SOCKET, SO_RCVBUF, (const char*)&optval, optlen); + if(rc != 0) + strerror(errno); } server_bindMulticast( 0, 99, sd[0]); -- 2.7.4 |
From: Jon M. <jon...@er...> - 2019-07-22 14:43:43
|
Acked-by: Jon Maloy <jon...@er...> Remember to change the prefix to 'net-next' and send it when net-next re-opens. ///jon > -----Original Message----- > From: Tuong Lien <tuo...@de...> > Sent: 18-Jul-19 23:57 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [net] tipc: fix false detection of retransmit failures > > This commit eliminates the use of the link 'stale_limit' & 'prev_from' > (besides the already removed - 'stale_cnt') variables in the detection of > repeated retransmit failures as there is no proper way to initialize them to > avoid a false detection, i.e. it is not really a retransmission failure but due to a > garbage values in the variables. > > Instead, a jiffies variable will be added to individual skbs (like the way we > restrict the skb retransmissions) in order to mark the first skb retransmit time. > Later on, at the next retransmissions, the timestamp will be checked to see if > the skb in the link transmq is "too stale", that is, the link tolerance time has > passed, so that a link reset will be ordered. Note, just checking on the first skb > in the queue is fine enough since it must be the oldest one. > A counter is also added to keep track the actual skb retransmissions' > number for later checking when the failure happens. > > The downside of this approach is that the skb->cb[] buffer is about to be > exhausted, however it is always able to allocate another memory area and > keep a reference to it when needed. > > Fixes: 77cf8edbc0e7 ("tipc: simplify stale link failure criteria") > Reported-by: Hoang Le <hoa...@de...> > Signed-off-by: Tuong Lien <tuo...@de...> > --- > net/tipc/link.c | 92 ++++++++++++++++++++++++++++++++--------------------- > ---- > net/tipc/msg.h | 8 +++-- > 2 files changed, 57 insertions(+), 43 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c index > 66d3a07bc571..c2c5c53cad22 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -106,8 +106,6 @@ struct tipc_stats { > * @transmitq: queue for sent, non-acked messages > * @backlogq: queue for messages waiting to be sent > * @snt_nxt: next sequence number to use for outbound messages > - * @prev_from: sequence number of most previous retransmission request > - * @stale_limit: time when repeated identical retransmits must force link > reset > * @ackers: # of peers that needs to ack each packet before it can be released > * @acked: # last packet acked by a certain peer. Used for broadcast. > * @rcv_nxt: next sequence number to expect for inbound messages @@ - > 164,9 +162,7 @@ struct tipc_link { > u16 limit; > } backlog[5]; > u16 snd_nxt; > - u16 prev_from; > u16 window; > - unsigned long stale_limit; > > /* Reception */ > u16 rcv_nxt; > @@ -1044,47 +1040,53 @@ static void tipc_link_advance_backlog(struct > tipc_link *l, > * link_retransmit_failure() - Detect repeated retransmit failures > * @l: tipc link sender > * @r: tipc link receiver (= l in case of unicast) > - * @from: seqno of the 1st packet in retransmit request > * @rc: returned code > * > * Return: true if the repeated retransmit failures happens, otherwise > * false > */ > static bool link_retransmit_failure(struct tipc_link *l, struct tipc_link *r, > - u16 from, int *rc) > + int *rc) > { > struct sk_buff *skb = skb_peek(&l->transmq); > struct tipc_msg *hdr; > > if (!skb) > return false; > - hdr = buf_msg(skb); > > - /* Detect repeated retransmit failures on same packet */ > - if (r->prev_from != from) { > - r->prev_from = from; > - r->stale_limit = jiffies + msecs_to_jiffies(r->tolerance); > - } else if (time_after(jiffies, r->stale_limit)) { > - pr_warn("Retransmission failure on link <%s>\n", l->name); > - link_print(l, "State of link "); > - pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", > - msg_user(hdr), msg_type(hdr), msg_size(hdr), > - msg_errcode(hdr)); > - pr_info("sqno %u, prev: %x, src: %x\n", > - msg_seqno(hdr), msg_prevnode(hdr), > msg_orignode(hdr)); > - > - trace_tipc_list_dump(&l->transmq, true, "retrans failure!"); > - trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!"); > - trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!"); > + if (!TIPC_SKB_CB(skb)->retr_cnt) > + return false; > > - if (link_is_bc_sndlink(l)) > - *rc = TIPC_LINK_DOWN_EVT; > + if (!time_after(jiffies, TIPC_SKB_CB(skb)->retr_stamp + > + msecs_to_jiffies(r->tolerance))) > + return false; > + > + hdr = buf_msg(skb); > + if (link_is_bc_sndlink(l) && !less(r->acked, msg_seqno(hdr))) > + return false; > > + pr_warn("Retransmission failure on link <%s>\n", l->name); > + link_print(l, "State of link "); > + pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", > + msg_user(hdr), msg_type(hdr), msg_size(hdr), > msg_errcode(hdr)); > + pr_info("sqno %u, prev: %x, dest: %x\n", > + msg_seqno(hdr), msg_prevnode(hdr), msg_destnode(hdr)); > + pr_info("retr_stamp %d, retr_cnt %d\n", > + jiffies_to_msecs(TIPC_SKB_CB(skb)->retr_stamp), > + TIPC_SKB_CB(skb)->retr_cnt); > + > + trace_tipc_list_dump(&l->transmq, true, "retrans failure!"); > + trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!"); > + trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!"); > + > + if (link_is_bc_sndlink(l)) { > + r->state = LINK_RESET; > + *rc = TIPC_LINK_DOWN_EVT; > + } else { > *rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); > - return true; > } > > - return false; > + return true; > } > > /* tipc_link_bc_retrans() - retransmit zero or more packets @@ -1110,7 > +1112,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link > *r, > > trace_tipc_link_retrans(r, from, to, &l->transmq); > > - if (link_retransmit_failure(l, r, from, &rc)) > + if (link_retransmit_failure(l, r, &rc)) > return rc; > > skb_queue_walk(&l->transmq, skb) { > @@ -1119,11 +1121,10 @@ static int tipc_link_bc_retrans(struct tipc_link *l, > struct tipc_link *r, > continue; > if (more(msg_seqno(hdr), to)) > break; > - if (link_is_bc_sndlink(l)) { > - if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) > - continue; > - TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; > - } > + > + if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) > + continue; > + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; > _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, > GFP_ATOMIC); > if (!_skb) > return 0; > @@ -1133,6 +1134,10 @@ static int tipc_link_bc_retrans(struct tipc_link *l, > struct tipc_link *r, > _skb->priority = TC_PRIO_CONTROL; > __skb_queue_tail(xmitq, _skb); > l->stats.retransmitted++; > + > + /* Increase actual retrans counter & mark first time */ > + if (!TIPC_SKB_CB(skb)->retr_cnt++) > + TIPC_SKB_CB(skb)->retr_stamp = jiffies; > } > return 0; > } > @@ -1357,12 +1362,10 @@ static int tipc_link_advance_transmq(struct > tipc_link *l, u16 acked, u16 gap, > struct tipc_msg *hdr; > u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; > u16 ack = l->rcv_nxt - 1; > + bool passed = false; > u16 seqno, n = 0; > int rc = 0; > > - if (gap && link_retransmit_failure(l, l, acked + 1, &rc)) > - return rc; > - > skb_queue_walk_safe(&l->transmq, skb, tmp) { > seqno = buf_seqno(skb); > > @@ -1372,12 +1375,17 @@ static int tipc_link_advance_transmq(struct > tipc_link *l, u16 acked, u16 gap, > __skb_unlink(skb, &l->transmq); > kfree_skb(skb); > } else if (less_eq(seqno, acked + gap)) { > - /* retransmit skb */ > + /* First, check if repeated retrans failures occurs? */ > + if (!passed && link_retransmit_failure(l, l, &rc)) > + return rc; > + passed = true; > + > + /* retransmit skb if unrestricted*/ > if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) > continue; > TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME; > - > - _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); > + _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, > + GFP_ATOMIC); > if (!_skb) > continue; > hdr = buf_msg(_skb); > @@ -1386,6 +1394,10 @@ static int tipc_link_advance_transmq(struct > tipc_link *l, u16 acked, u16 gap, > _skb->priority = TC_PRIO_CONTROL; > __skb_queue_tail(xmitq, _skb); > l->stats.retransmitted++; > + > + /* Increase actual retrans counter & mark first time */ > + if (!TIPC_SKB_CB(skb)->retr_cnt++) > + TIPC_SKB_CB(skb)->retr_stamp = jiffies; > } else { > /* retry with Gap ACK blocks if any */ > if (!ga || n >= ga->gack_cnt) > @@ -2577,7 +2589,7 @@ int tipc_link_dump(struct tipc_link *l, u16 > dqueues, char *buf) > i += scnprintf(buf + i, sz - i, " %x", l->peer_caps); > i += scnprintf(buf + i, sz - i, " %u", l->silent_intv_cnt); > i += scnprintf(buf + i, sz - i, " %u", l->rst_cnt); > - i += scnprintf(buf + i, sz - i, " %u", l->prev_from); > + i += scnprintf(buf + i, sz - i, " %u", 0); > i += scnprintf(buf + i, sz - i, " %u", 0); > i += scnprintf(buf + i, sz - i, " %u", l->acked); > > diff --git a/net/tipc/msg.h b/net/tipc/msg.h index > da509f0eb9ca..d7ebc9e955f6 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -102,13 +102,15 @@ struct plist; > #define TIPC_MEDIA_INFO_OFFSET 5 > > struct tipc_skb_cb { > - u32 bytes_read; > - u32 orig_member; > struct sk_buff *tail; > unsigned long nxt_retr; > - bool validated; > + unsigned long retr_stamp; > + u32 bytes_read; > + u32 orig_member; > u16 chain_imp; > u16 ackers; > + u16 retr_cnt; > + bool validated; > }; > > #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) > -- > 2.13.7 |
From: David M. <da...@da...> - 2019-07-21 18:43:04
|
From: Christophe JAILLET <chr...@wa...> Date: Sun, 21 Jul 2019 12:38:11 +0200 > s/tipc_toprsv_listener_data_ready/tipc_topsrv_listener_data_ready/ > (r and s switched in topsrv) > > Signed-off-by: Christophe JAILLET <chr...@wa...> > --- > The function name could also be removed from the comment. It does not > bring any useful information IMHO. Applied, thanks Christophe. |
From: Jon M. <jon...@er...> - 2019-07-19 15:35:41
|
See below. > -----Original Message----- > From: tung quang nguyen <tun...@de...> > Sent: 19-Jul-19 01:41 > To: Jon Maloy <jon...@er...>; 'Jon Maloy' > <ma...@do...>; tip...@li...; > yin...@wi... > Cc: Hoang Huu Le <hoa...@de...>; Tuong Tong Lien > <tuo...@de...>; John Rutherford > <joh...@de...> > Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation > > Hi Jon, > > See below. > > Best regards, > Tung Nguyen > > -----Original Message----- > From: Jon Maloy <jon...@er...> > Sent: Friday, July 19, 2019 10:05 AM > To: Tung Quang Nguyen <tun...@de...>; 'Jon Maloy' > <ma...@do...>; tip...@li...; > yin...@wi... > Cc: Mohan Krishna Ghanta Krishnamurthy > <moh...@er...>; > par...@gm...; Hoang Huu Le > <hoa...@de...>; Canh Duc Luu > <can...@de...>; Tuong Tong Lien > <tuo...@de...>; Gordan Mihaljevic > <gor...@de...> > Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation > > Hi Tung, > I did of course do some measurements before sending out the patch > yesterday, but I saw no significant difference in performance between the > methods. > The results were within the range of normal, stochastic variations, and making > many runs only confirmed that. > This was also what I was expecting. > > I have now made a couple of changes to my patch: > 1) I took into account that backlog[imp].len often is larger than > backlog[imp].limit (because of the oversubscription) when calculating the > available number of backlog queue slots, resulting in a negative avail[imp] > value. This would have the effect that we sometimes keep submitting wakeup > jobs when there are no slots left in the backlog queue. Those jobs will be > launched, run, immediately find that we are at congestion again, and be > suspended a second time. That is, a gross waste of CPU resources. (This is a > weakness of your algorithm, too) > [Tung]: Exactly but the lists of skb are added to the link backlog queues after > this "Those jobs will be launched, run, immediately find that we are at > congestion again". That's the reason I chose to schedule all wakeup messages > right after the condition is met. All sleeping sockets will be waken up and send > their messages to link backlog queues before being put into sleeping state > again. With that, we can reduce the risk of starvation. True. But doing it this way is a very aggressive approach, as the backlog queues may grow *very* long. The use of oversubscription does in practice mean overriding the backlog queue limit, and my fear is that those can accumulate to insane lengths. If you look at the statistics in my previous mail, you will see that 94 wakeup jobs for 65536 byte messages means that 94 * 46 = 4324 packets have been added to the queue *beyond* the backlog queue limit. And nothing is stopping it from growing further if new sockets come along and start transmitting. My algorithm is an attempt to limit this effect, so that we only wake up as many jobs as we think there is space for in the backlog queue. The algorithm is far from perfect. E.g., when counting free slots in the backlog queue we count packets, while each job we launch may send multi-packet messages. I.e., if we in the example above have freed up 10 slots in the backlog queue, we launch 10 jobs, which each and one will add 46 packets to the queue, a total of 460 packets. The remaining 84 jobs will have to wait, maybe with another risk of starvation. We should try to figure out if this can be done in a smarter way somehow. > Your patch V2 was exactly my first patch except the prepending of wakeup > messages to the queue tail. But I thought more about above-mentioned > scenario and changed to the patch I sent to you to avoid the risk completely. > > 2) After adding some statistics counters, I realized that there are practically > never any wakeup messages in the input queue when we run > prepare_wakeup(), so we can just as well skip the slightly expensive step to > count and reschedule those jobs. This may lead to that we at rare occasions > issue too many wakeup jobs, but it is seems to be worth the cost. > > 3) I added the wakeup jobs to the tail of the input queue, instead of to the > head, so they will be run after regular input queue messages. I felt slightly > uncomfortable with prepending the wakeup messages, as it might lead to > surprising new behaviors. This is in practice what you are doing, too, since > you check the wakeup queue and call tipc_sk_wakeup_rcv() after we have > checked the input queue and calles tipc_sk_rcv(). > > I also re-ran my measurements with the new version, many times, and the > pattern was always the same. > [Tung]: I verified and confirm it. It is better than the first version which > showed very bad result in all my rounds of execution. > Let's wait and see if this patch could fix the issue. Yes. If it doesn't, we will have to eliminate the "available backlog slot" limitation again, like you did in your algorithm, but I would prefer to avoid that. In my view, chances are good that this will work, I think the real culprit was the 10-limit counter which was accumulated across all importance levels. BR ///jon > > Below is a sample. > > No patch: > ------------- > > node1 ~ # bmc -t -c100 > ****** TIPC Benchmark Client Started ****** Transferring 64000 messages > in TIPC Throughput Benchmark > +----------------------------------------------------------------------- > +---- > ------------------+ > | Msg Size | # | # Msgs/ | Elapsed | Throughput > | > | [octets] | Conns | Conn | [ms] > +------------------------------------------------+ > | | | | | Total [Msg/s] | Total [Mb/s] > | Per Conn [Mb/s] | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 64 | 100 | 64000 | 5635 | 1135714 | 581 > | 5 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 256 | 100 | 32000 | 4221 | 758006 | 1552 > | 15 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 1024 | 100 | 16000 | 9023 | 177316 | 1452 > | 14 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 4096 | 100 | 8000 | 10238 | 78139 | 2560 > | 25 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 16384 | 100 | 4000 | 15651 | 25557 | 3349 > | 33 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 65536 | 100 | 2000 | 29197 | 6849 | 3590 > | 35 | > +----------------------------------------------------------------------- > +---- > ------------------+ > Completed Throughput Benchmark > ****** TIPC Benchmark Client Finished ****** > node1 ~ # > > Tung's patch: > ----------------- > node1 ~ # bmc -t -c100 > ****** TIPC Benchmark Client Started ****** Transferring 64000 messages > in TIPC Throughput Benchmark > +----------------------------------------------------------------------- > +---- > ------------------+ > | Msg Size | # | # Msgs/ | Elapsed | Throughput > | > | [octets] | Conns | Conn | [ms] > +------------------------------------------------+ > | | | | | Total [Msg/s] | Total [Mb/s] > | Per Conn [Mb/s] | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 64 | 100 | 64000 | 5906 | 1083527 | 554 > | 5 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 256 | 100 | 32000 | 3510 | 911531 | 1866 > | 18 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 1024 | 100 | 16000 | 9594 | 166755 | 1366 > | 13 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 4096 | 100 | 8000 | 9738 | 82144 | 2691 > | 26 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 16384 | 100 | 4000 | 15760 | 25379 | 3326 > | 33 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 65536 | 100 | 2000 | 30615 | 6532 | 3424 > | 34 | > +----------------------------------------------------------------------- > +---- > ------------------+ > Completed Throughput Benchmark > ****** TIPC Benchmark Client Finished ****** > node1 ~ # > > Jon's patch, v2: > ----------------- > node1 ~ # bmc -t -c100 > +----------------------------------------------------------------------- > +---- > ------------------+ > | Msg Size | # | # Msgs/ | Elapsed | Throughput > | > | [octets] | Conns | Conn | [ms] > +------------------------------------------------+ > | | | | | Total [Msg/s] | Total [Mb/s] > | Per Conn [Mb/s] | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 64 | 100 | 64000 | 5465 | 1171064 | 599 > | 5 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 256 | 100 | 32000 | 3270 | 978490 | 2003 > | 20 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 1024 | 100 | 16000 | 6987 | 228964 | 1875 > | 18 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 4096 | 100 | 8000 | 9562 | 83657 | 2741 > | 27 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 16384 | 100 | 4000 | 15603 | 25635 | 3360 > | 33 | > +----------------------------------------------------------------------- > +---- > ------------------+ > | 65536 | 100 | 2000 | 28385 | 7045 | 3693 > | 36 | > +----------------------------------------------------------------------- > +---- > ------------------+ > Completed Throughput Benchmark > ****** TIPC Benchmark Client Finished ****** > node1 ~ # > > BR > ///jon > > > > -----Original Message----- > > From: tung quang nguyen <tun...@de...> > > Sent: 18-Jul-19 06:03 > > To: Jon Maloy <jon...@er...>; 'Jon Maloy' > > <ma...@do...>; tip...@li...; > > yin...@wi... > > Cc: Mohan Krishna Ghanta Krishnamurthy > > <moh...@er...>; > > par...@gm...; Hoang Huu Le > > <hoa...@de...>; Canh Duc Luu > <can...@de...>; > > Tuong Tong Lien <tuo...@de...>; Gordan Mihaljevic > > <gor...@de...> > > Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation > > > > Hi Jon, > > > > You patch is too complex. There will be a lot of overheads when > > grabbing/releasing locks (3 times) and using 2 loops. > > As a result, performance of benchmark test is reduced significantly > > while > my > > patch shows the same performance with the original code. > > > > This is the comparison between my patch and yours after running > > benchmark using 100 connections (Message size in bytes: x% better): > > - 64: 23.5% > > - 256: 30.2% > > - 1024: 19.5% > > - 4096: 14.9% > > - 16384: 6.7% > > - 65536: 2.4% > > > > So, I do not think your patch would solve the issue. > > > > Thanks. > > > > Best regards, > > Tung Nguyen > > > > -----Original Message----- > > From: Jon Maloy <jon...@er...> > > Sent: Thursday, July 18, 2019 4:22 AM > > To: Jon Maloy <ma...@do...> > > Cc: Mohan Krishna Ghanta Krishnamurthy > > <moh...@er...>; > > par...@gm...; Tung Quang Nguyen > > <tun...@de...>; Hoang Huu Le > > <hoa...@de...>; Canh Duc Luu > <can...@de...>; > > Tuong Tong Lien <tuo...@de...>; Gordan Mihaljevic > > <gor...@de...>; yin...@wi...; tipc- > > dis...@li... > > Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation > > > > Hi Tung, > > After thinking more about this, I realized that the problem is a > > little > more > > complex than I first imagined. > > We must also take into account that there may still be old wakeup > > messages > in > > the input queue when we are trying to add new ones. Those may be > > scattered around in the input queue because new data messages have > > arrived before they were scheduled. > > So, we must make sure that those are still placed at the head of the > > input queue, before any new wakeup messages, which should be before > > any data messages. > > Those messages should also be counted when we calculate the available > space > > in the backlog queue, so that there is never more pending wakeup jobs > > than there are available slots in in that queue. > > So, I ended up with writing my own patch for this, I hope you don't mind. > > I tested this as far as I could, but I assume you have a more relevant > test > > program than me for this. > > > > BR > > ///jon > > > > > > > -----Original Message----- > > > From: Jon Maloy > > > Sent: 17-Jul-19 16:56 > > > To: Jon Maloy <jon...@er...>; Jon Maloy > > <ma...@do...> > > > Cc: Mohan Krishna Ghanta Krishnamurthy > > > <moh...@er...>; > > > par...@gm...; Tung Quang Nguyen > > > <tun...@de...>; Hoang Huu Le > > > <hoa...@de...>; Canh Duc Luu > > <can...@de...>; > > > Tuong Tong Lien <tuo...@de...>; Gordan Mihaljevic > > > <gor...@de...>; yin...@wi...; tipc- > > > dis...@li... > > > Subject: [net 1/1] tipc: reduce risk of wakeup queue starvation > > > > > > In commit 365ad353c256 ("tipc: reduce risk of user starvation during > > > link > > > congestion") we allowed senders to add exactly one list of extra > > > buffers > > to the > > > link backlog queues during link congestion (aka "oversubscription"). > > However, > > > the criteria for when to stop adding wakeup messages to the input > > > queue when the overload abates is inaccurate, and may cause > > > starvation problems during very high load. > > > > > > Currently, we stop adding wakeup messages after 10 total failed > > > attempts where we find that there is no space left in the backlog > > > queue for a > > certain > > > importance level. The counter for this is accumulated across all > > > levels, > > which > > > may lead the algorithm to leave the loop prematurely, although there > > > may > > still > > > be plenty of space available at some levels. > > > The result is sometimes that messages near the wakeup queue tail are > > > not added to the input queue as they should be. > > > > > > We now introduce a more exact algorithm, where we don't stop adding > > > messages until all backlog queue levels have have saturated or there > > > are > > no > > > more wakeup messages to dequeue. > > > > > > Fixes: 365ad35 ("tipc: reduce risk of user starvation during link > > congestion") > > > Reported-by: Tung Nguyen <tun...@de...> > > > Signed-off-by: Jon Maloy <jon...@er...> > > > --- > > > net/tipc/link.c | 43 +++++++++++++++++++++++++++++++++++-------- > > > 1 file changed, 35 insertions(+), 8 deletions(-) > > > > > > diff --git a/net/tipc/link.c b/net/tipc/link.c index > > > 66d3a07..1f41523 > > 100644 > > > --- a/net/tipc/link.c > > > +++ b/net/tipc/link.c > > > @@ -853,18 +853,45 @@ static int link_schedule_user(struct tipc_link > > > *l, struct tipc_msg *hdr) > > > */ > > > static void link_prepare_wakeup(struct tipc_link *l) { > > > + struct sk_buff_head *wakeupq = &l->wakeupq; > > > + struct sk_buff_head *inputq = l->inputq; > > > struct sk_buff *skb, *tmp; > > > - int imp, i = 0; > > > + struct sk_buff_head tmpq; > > > + int avail[5] = {0,}; > > > + int imp = 0; > > > + > > > + __skb_queue_head_init(&tmpq); > > > + > > > + for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) > > > + avail[imp] = l->backlog[imp].limit - l->backlog[imp].len; > > > > > > - skb_queue_walk_safe(&l->wakeupq, skb, tmp) { > > > + /* Already pending wakeup messages in inputq must come first */ > > > + spin_lock_bh(&inputq->lock); > > > + skb_queue_walk_safe(inputq, skb, tmp) { > > > + if (msg_user(buf_msg(skb)) != SOCK_WAKEUP) > > > + continue; > > > + __skb_unlink(skb, inputq); > > > + __skb_queue_tail(&tmpq, skb); > > > imp = TIPC_SKB_CB(skb)->chain_imp; > > > - if (l->backlog[imp].len < l->backlog[imp].limit) { > > > - skb_unlink(skb, &l->wakeupq); > > > - skb_queue_tail(l->inputq, skb); > > > - } else if (i++ > 10) { > > > - break; > > > - } > > > + if (avail[imp]) > > > + avail[imp]--; > > > } > > > + spin_unlock_bh(&inputq->lock); > > > + > > > + spin_lock_bh(&wakeupq->lock); > > > + skb_queue_walk_safe(wakeupq, skb, tmp) { > > > + imp = TIPC_SKB_CB(skb)->chain_imp; > > > + if (!avail[imp]) > > > + continue; > > > + avail[imp]--; > > > + __skb_unlink(skb, wakeupq); > > > + __skb_queue_tail(&tmpq, skb); > > > + } > > > + spin_unlock_bh(&wakeupq->lock); > > > + > > > + spin_lock_bh(&inputq->lock); > > > + skb_queue_splice(&tmpq, inputq); > > > + spin_unlock_bh(&inputq->lock); > > > } > > > > > > void tipc_link_reset(struct tipc_link *l) > > > -- > > > 2.1.4 > > > |
From: tung q. n. <tun...@de...> - 2019-07-19 05:41:14
|
Hi Jon, See below. Best regards, Tung Nguyen -----Original Message----- From: Jon Maloy <jon...@er...> Sent: Friday, July 19, 2019 10:05 AM To: Tung Quang Nguyen <tun...@de...>; 'Jon Maloy' <ma...@do...>; tip...@li...; yin...@wi... Cc: Mohan Krishna Ghanta Krishnamurthy <moh...@er...>; par...@gm...; Hoang Huu Le <hoa...@de...>; Canh Duc Luu <can...@de...>; Tuong Tong Lien <tuo...@de...>; Gordan Mihaljevic <gor...@de...> Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation Hi Tung, I did of course do some measurements before sending out the patch yesterday, but I saw no significant difference in performance between the methods. The results were within the range of normal, stochastic variations, and making many runs only confirmed that. This was also what I was expecting. I have now made a couple of changes to my patch: 1) I took into account that backlog[imp].len often is larger than backlog[imp].limit (because of the oversubscription) when calculating the available number of backlog queue slots, resulting in a negative avail[imp] value. This would have the effect that we sometimes keep submitting wakeup jobs when there are no slots left in the backlog queue. Those jobs will be launched, run, immediately find that we are at congestion again, and be suspended a second time. That is, a gross waste of CPU resources. (This is a weakness of your algorithm, too) [Tung]: Exactly but the lists of skb are added to the link backlog queues after this "Those jobs will be launched, run, immediately find that we are at congestion again". That's the reason I chose to schedule all wakeup messages right after the condition is met. All sleeping sockets will be waken up and send their messages to link backlog queues before being put into sleeping state again. With that, we can reduce the risk of starvation. Your patch V2 was exactly my first patch except the prepending of wakeup messages to the queue tail. But I thought more about above-mentioned scenario and changed to the patch I sent to you to avoid the risk completely. 2) After adding some statistics counters, I realized that there are practically never any wakeup messages in the input queue when we run prepare_wakeup(), so we can just as well skip the slightly expensive step to count and reschedule those jobs. This may lead to that we at rare occasions issue too many wakeup jobs, but it is seems to be worth the cost. 3) I added the wakeup jobs to the tail of the input queue, instead of to the head, so they will be run after regular input queue messages. I felt slightly uncomfortable with prepending the wakeup messages, as it might lead to surprising new behaviors. This is in practice what you are doing, too, since you check the wakeup queue and call tipc_sk_wakeup_rcv() after we have checked the input queue and calles tipc_sk_rcv(). I also re-ran my measurements with the new version, many times, and the pattern was always the same. [Tung]: I verified and confirm it. It is better than the first version which showed very bad result in all my rounds of execution. Let's wait and see if this patch could fix the issue. Below is a sample. No patch: ------------- node1 ~ # bmc -t -c100 ****** TIPC Benchmark Client Started ****** Transferring 64000 messages in TIPC Throughput Benchmark +--------------------------------------------------------------------------- ------------------+ | Msg Size | # | # Msgs/ | Elapsed | Throughput | | [octets] | Conns | Conn | [ms] +------------------------------------------------+ | | | | | Total [Msg/s] | Total [Mb/s] | Per Conn [Mb/s] | +--------------------------------------------------------------------------- ------------------+ | 64 | 100 | 64000 | 5635 | 1135714 | 581 | 5 | +--------------------------------------------------------------------------- ------------------+ | 256 | 100 | 32000 | 4221 | 758006 | 1552 | 15 | +--------------------------------------------------------------------------- ------------------+ | 1024 | 100 | 16000 | 9023 | 177316 | 1452 | 14 | +--------------------------------------------------------------------------- ------------------+ | 4096 | 100 | 8000 | 10238 | 78139 | 2560 | 25 | +--------------------------------------------------------------------------- ------------------+ | 16384 | 100 | 4000 | 15651 | 25557 | 3349 | 33 | +--------------------------------------------------------------------------- ------------------+ | 65536 | 100 | 2000 | 29197 | 6849 | 3590 | 35 | +--------------------------------------------------------------------------- ------------------+ Completed Throughput Benchmark ****** TIPC Benchmark Client Finished ****** node1 ~ # Tung's patch: ----------------- node1 ~ # bmc -t -c100 ****** TIPC Benchmark Client Started ****** Transferring 64000 messages in TIPC Throughput Benchmark +--------------------------------------------------------------------------- ------------------+ | Msg Size | # | # Msgs/ | Elapsed | Throughput | | [octets] | Conns | Conn | [ms] +------------------------------------------------+ | | | | | Total [Msg/s] | Total [Mb/s] | Per Conn [Mb/s] | +--------------------------------------------------------------------------- ------------------+ | 64 | 100 | 64000 | 5906 | 1083527 | 554 | 5 | +--------------------------------------------------------------------------- ------------------+ | 256 | 100 | 32000 | 3510 | 911531 | 1866 | 18 | +--------------------------------------------------------------------------- ------------------+ | 1024 | 100 | 16000 | 9594 | 166755 | 1366 | 13 | +--------------------------------------------------------------------------- ------------------+ | 4096 | 100 | 8000 | 9738 | 82144 | 2691 | 26 | +--------------------------------------------------------------------------- ------------------+ | 16384 | 100 | 4000 | 15760 | 25379 | 3326 | 33 | +--------------------------------------------------------------------------- ------------------+ | 65536 | 100 | 2000 | 30615 | 6532 | 3424 | 34 | +--------------------------------------------------------------------------- ------------------+ Completed Throughput Benchmark ****** TIPC Benchmark Client Finished ****** node1 ~ # Jon's patch, v2: ----------------- node1 ~ # bmc -t -c100 +--------------------------------------------------------------------------- ------------------+ | Msg Size | # | # Msgs/ | Elapsed | Throughput | | [octets] | Conns | Conn | [ms] +------------------------------------------------+ | | | | | Total [Msg/s] | Total [Mb/s] | Per Conn [Mb/s] | +--------------------------------------------------------------------------- ------------------+ | 64 | 100 | 64000 | 5465 | 1171064 | 599 | 5 | +--------------------------------------------------------------------------- ------------------+ | 256 | 100 | 32000 | 3270 | 978490 | 2003 | 20 | +--------------------------------------------------------------------------- ------------------+ | 1024 | 100 | 16000 | 6987 | 228964 | 1875 | 18 | +--------------------------------------------------------------------------- ------------------+ | 4096 | 100 | 8000 | 9562 | 83657 | 2741 | 27 | +--------------------------------------------------------------------------- ------------------+ | 16384 | 100 | 4000 | 15603 | 25635 | 3360 | 33 | +--------------------------------------------------------------------------- ------------------+ | 65536 | 100 | 2000 | 28385 | 7045 | 3693 | 36 | +--------------------------------------------------------------------------- ------------------+ Completed Throughput Benchmark ****** TIPC Benchmark Client Finished ****** node1 ~ # BR ///jon > -----Original Message----- > From: tung quang nguyen <tun...@de...> > Sent: 18-Jul-19 06:03 > To: Jon Maloy <jon...@er...>; 'Jon Maloy' > <ma...@do...>; tip...@li...; > yin...@wi... > Cc: Mohan Krishna Ghanta Krishnamurthy > <moh...@er...>; > par...@gm...; Hoang Huu Le > <hoa...@de...>; Canh Duc Luu > <can...@de...>; Tuong Tong Lien > <tuo...@de...>; Gordan Mihaljevic > <gor...@de...> > Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation > > Hi Jon, > > You patch is too complex. There will be a lot of overheads when > grabbing/releasing locks (3 times) and using 2 loops. > As a result, performance of benchmark test is reduced significantly while my > patch shows the same performance with the original code. > > This is the comparison between my patch and yours after running benchmark > using 100 connections (Message size in bytes: x% better): > - 64: 23.5% > - 256: 30.2% > - 1024: 19.5% > - 4096: 14.9% > - 16384: 6.7% > - 65536: 2.4% > > So, I do not think your patch would solve the issue. > > Thanks. > > Best regards, > Tung Nguyen > > -----Original Message----- > From: Jon Maloy <jon...@er...> > Sent: Thursday, July 18, 2019 4:22 AM > To: Jon Maloy <ma...@do...> > Cc: Mohan Krishna Ghanta Krishnamurthy > <moh...@er...>; > par...@gm...; Tung Quang Nguyen > <tun...@de...>; Hoang Huu Le > <hoa...@de...>; Canh Duc Luu > <can...@de...>; Tuong Tong Lien > <tuo...@de...>; Gordan Mihaljevic > <gor...@de...>; yin...@wi...; tipc- > dis...@li... > Subject: RE: [net 1/1] tipc: reduce risk of wakeup queue starvation > > Hi Tung, > After thinking more about this, I realized that the problem is a little more > complex than I first imagined. > We must also take into account that there may still be old wakeup messages in > the input queue when we are trying to add new ones. Those may be scattered > around in the input queue because new data messages have arrived before > they were scheduled. > So, we must make sure that those are still placed at the head of the input > queue, before any new wakeup messages, which should be before any data > messages. > Those messages should also be counted when we calculate the available space > in the backlog queue, so that there is never more pending wakeup jobs than > there are available slots in in that queue. > So, I ended up with writing my own patch for this, I hope you don't mind. > I tested this as far as I could, but I assume you have a more relevant test > program than me for this. > > BR > ///jon > > > > -----Original Message----- > > From: Jon Maloy > > Sent: 17-Jul-19 16:56 > > To: Jon Maloy <jon...@er...>; Jon Maloy > <ma...@do...> > > Cc: Mohan Krishna Ghanta Krishnamurthy > > <moh...@er...>; > > par...@gm...; Tung Quang Nguyen > > <tun...@de...>; Hoang Huu Le > > <hoa...@de...>; Canh Duc Luu > <can...@de...>; > > Tuong Tong Lien <tuo...@de...>; Gordan Mihaljevic > > <gor...@de...>; yin...@wi...; tipc- > > dis...@li... > > Subject: [net 1/1] tipc: reduce risk of wakeup queue starvation > > > > In commit 365ad353c256 ("tipc: reduce risk of user starvation during > > link > > congestion") we allowed senders to add exactly one list of extra > > buffers > to the > > link backlog queues during link congestion (aka "oversubscription"). > However, > > the criteria for when to stop adding wakeup messages to the input > > queue when the overload abates is inaccurate, and may cause starvation > > problems during very high load. > > > > Currently, we stop adding wakeup messages after 10 total failed > > attempts where we find that there is no space left in the backlog > > queue for a > certain > > importance level. The counter for this is accumulated across all > > levels, > which > > may lead the algorithm to leave the loop prematurely, although there > > may > still > > be plenty of space available at some levels. > > The result is sometimes that messages near the wakeup queue tail are > > not added to the input queue as they should be. > > > > We now introduce a more exact algorithm, where we don't stop adding > > messages until all backlog queue levels have have saturated or there > > are > no > > more wakeup messages to dequeue. > > > > Fixes: 365ad35 ("tipc: reduce risk of user starvation during link > congestion") > > Reported-by: Tung Nguyen <tun...@de...> > > Signed-off-by: Jon Maloy <jon...@er...> > > --- > > net/tipc/link.c | 43 +++++++++++++++++++++++++++++++++++-------- > > 1 file changed, 35 insertions(+), 8 deletions(-) > > > > diff --git a/net/tipc/link.c b/net/tipc/link.c index 66d3a07..1f41523 > 100644 > > --- a/net/tipc/link.c > > +++ b/net/tipc/link.c > > @@ -853,18 +853,45 @@ static int link_schedule_user(struct tipc_link > > *l, struct tipc_msg *hdr) > > */ > > static void link_prepare_wakeup(struct tipc_link *l) { > > + struct sk_buff_head *wakeupq = &l->wakeupq; > > + struct sk_buff_head *inputq = l->inputq; > > struct sk_buff *skb, *tmp; > > - int imp, i = 0; > > + struct sk_buff_head tmpq; > > + int avail[5] = {0,}; > > + int imp = 0; > > + > > + __skb_queue_head_init(&tmpq); > > + > > + for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) > > + avail[imp] = l->backlog[imp].limit - l->backlog[imp].len; > > > > - skb_queue_walk_safe(&l->wakeupq, skb, tmp) { > > + /* Already pending wakeup messages in inputq must come first */ > > + spin_lock_bh(&inputq->lock); > > + skb_queue_walk_safe(inputq, skb, tmp) { > > + if (msg_user(buf_msg(skb)) != SOCK_WAKEUP) > > + continue; > > + __skb_unlink(skb, inputq); > > + __skb_queue_tail(&tmpq, skb); > > imp = TIPC_SKB_CB(skb)->chain_imp; > > - if (l->backlog[imp].len < l->backlog[imp].limit) { > > - skb_unlink(skb, &l->wakeupq); > > - skb_queue_tail(l->inputq, skb); > > - } else if (i++ > 10) { > > - break; > > - } > > + if (avail[imp]) > > + avail[imp]--; > > } > > + spin_unlock_bh(&inputq->lock); > > + > > + spin_lock_bh(&wakeupq->lock); > > + skb_queue_walk_safe(wakeupq, skb, tmp) { > > + imp = TIPC_SKB_CB(skb)->chain_imp; > > + if (!avail[imp]) > > + continue; > > + avail[imp]--; > > + __skb_unlink(skb, wakeupq); > > + __skb_queue_tail(&tmpq, skb); > > + } > > + spin_unlock_bh(&wakeupq->lock); > > + > > + spin_lock_bh(&inputq->lock); > > + skb_queue_splice(&tmpq, inputq); > > + spin_unlock_bh(&inputq->lock); > > } > > > > void tipc_link_reset(struct tipc_link *l) > > -- > > 2.1.4 > |
From: Tuong L. <tuo...@de...> - 2019-07-19 03:56:59
|
This commit eliminates the use of the link 'stale_limit' & 'prev_from' (besides the already removed - 'stale_cnt') variables in the detection of repeated retransmit failures as there is no proper way to initialize them to avoid a false detection, i.e. it is not really a retransmission failure but due to a garbage values in the variables. Instead, a jiffies variable will be added to individual skbs (like the way we restrict the skb retransmissions) in order to mark the first skb retransmit time. Later on, at the next retransmissions, the timestamp will be checked to see if the skb in the link transmq is "too stale", that is, the link tolerance time has passed, so that a link reset will be ordered. Note, just checking on the first skb in the queue is fine enough since it must be the oldest one. A counter is also added to keep track the actual skb retransmissions' number for later checking when the failure happens. The downside of this approach is that the skb->cb[] buffer is about to be exhausted, however it is always able to allocate another memory area and keep a reference to it when needed. Fixes: 77cf8edbc0e7 ("tipc: simplify stale link failure criteria") Reported-by: Hoang Le <hoa...@de...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 92 ++++++++++++++++++++++++++++++++------------------------- net/tipc/msg.h | 8 +++-- 2 files changed, 57 insertions(+), 43 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 66d3a07bc571..c2c5c53cad22 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -106,8 +106,6 @@ struct tipc_stats { * @transmitq: queue for sent, non-acked messages * @backlogq: queue for messages waiting to be sent * @snt_nxt: next sequence number to use for outbound messages - * @prev_from: sequence number of most previous retransmission request - * @stale_limit: time when repeated identical retransmits must force link reset * @ackers: # of peers that needs to ack each packet before it can be released * @acked: # last packet acked by a certain peer. Used for broadcast. * @rcv_nxt: next sequence number to expect for inbound messages @@ -164,9 +162,7 @@ struct tipc_link { u16 limit; } backlog[5]; u16 snd_nxt; - u16 prev_from; u16 window; - unsigned long stale_limit; /* Reception */ u16 rcv_nxt; @@ -1044,47 +1040,53 @@ static void tipc_link_advance_backlog(struct tipc_link *l, * link_retransmit_failure() - Detect repeated retransmit failures * @l: tipc link sender * @r: tipc link receiver (= l in case of unicast) - * @from: seqno of the 1st packet in retransmit request * @rc: returned code * * Return: true if the repeated retransmit failures happens, otherwise * false */ static bool link_retransmit_failure(struct tipc_link *l, struct tipc_link *r, - u16 from, int *rc) + int *rc) { struct sk_buff *skb = skb_peek(&l->transmq); struct tipc_msg *hdr; if (!skb) return false; - hdr = buf_msg(skb); - /* Detect repeated retransmit failures on same packet */ - if (r->prev_from != from) { - r->prev_from = from; - r->stale_limit = jiffies + msecs_to_jiffies(r->tolerance); - } else if (time_after(jiffies, r->stale_limit)) { - pr_warn("Retransmission failure on link <%s>\n", l->name); - link_print(l, "State of link "); - pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", - msg_user(hdr), msg_type(hdr), msg_size(hdr), - msg_errcode(hdr)); - pr_info("sqno %u, prev: %x, src: %x\n", - msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); - - trace_tipc_list_dump(&l->transmq, true, "retrans failure!"); - trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!"); - trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!"); + if (!TIPC_SKB_CB(skb)->retr_cnt) + return false; - if (link_is_bc_sndlink(l)) - *rc = TIPC_LINK_DOWN_EVT; + if (!time_after(jiffies, TIPC_SKB_CB(skb)->retr_stamp + + msecs_to_jiffies(r->tolerance))) + return false; + + hdr = buf_msg(skb); + if (link_is_bc_sndlink(l) && !less(r->acked, msg_seqno(hdr))) + return false; + pr_warn("Retransmission failure on link <%s>\n", l->name); + link_print(l, "State of link "); + pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n", + msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr)); + pr_info("sqno %u, prev: %x, dest: %x\n", + msg_seqno(hdr), msg_prevnode(hdr), msg_destnode(hdr)); + pr_info("retr_stamp %d, retr_cnt %d\n", + jiffies_to_msecs(TIPC_SKB_CB(skb)->retr_stamp), + TIPC_SKB_CB(skb)->retr_cnt); + + trace_tipc_list_dump(&l->transmq, true, "retrans failure!"); + trace_tipc_link_dump(l, TIPC_DUMP_NONE, "retrans failure!"); + trace_tipc_link_dump(r, TIPC_DUMP_NONE, "retrans failure!"); + + if (link_is_bc_sndlink(l)) { + r->state = LINK_RESET; + *rc = TIPC_LINK_DOWN_EVT; + } else { *rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); - return true; } - return false; + return true; } /* tipc_link_bc_retrans() - retransmit zero or more packets @@ -1110,7 +1112,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, trace_tipc_link_retrans(r, from, to, &l->transmq); - if (link_retransmit_failure(l, r, from, &rc)) + if (link_retransmit_failure(l, r, &rc)) return rc; skb_queue_walk(&l->transmq, skb) { @@ -1119,11 +1121,10 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, continue; if (more(msg_seqno(hdr), to)) break; - if (link_is_bc_sndlink(l)) { - if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) - continue; - TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; - } + + if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) + continue; + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC); if (!_skb) return 0; @@ -1133,6 +1134,10 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; + + /* Increase actual retrans counter & mark first time */ + if (!TIPC_SKB_CB(skb)->retr_cnt++) + TIPC_SKB_CB(skb)->retr_stamp = jiffies; } return 0; } @@ -1357,12 +1362,10 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, struct tipc_msg *hdr; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; u16 ack = l->rcv_nxt - 1; + bool passed = false; u16 seqno, n = 0; int rc = 0; - if (gap && link_retransmit_failure(l, l, acked + 1, &rc)) - return rc; - skb_queue_walk_safe(&l->transmq, skb, tmp) { seqno = buf_seqno(skb); @@ -1372,12 +1375,17 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, __skb_unlink(skb, &l->transmq); kfree_skb(skb); } else if (less_eq(seqno, acked + gap)) { - /* retransmit skb */ + /* First, check if repeated retrans failures occurs? */ + if (!passed && link_retransmit_failure(l, l, &rc)) + return rc; + passed = true; + + /* retransmit skb if unrestricted*/ if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) continue; TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME; - - _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); + _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, + GFP_ATOMIC); if (!_skb) continue; hdr = buf_msg(_skb); @@ -1386,6 +1394,10 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; + + /* Increase actual retrans counter & mark first time */ + if (!TIPC_SKB_CB(skb)->retr_cnt++) + TIPC_SKB_CB(skb)->retr_stamp = jiffies; } else { /* retry with Gap ACK blocks if any */ if (!ga || n >= ga->gack_cnt) @@ -2577,7 +2589,7 @@ int tipc_link_dump(struct tipc_link *l, u16 dqueues, char *buf) i += scnprintf(buf + i, sz - i, " %x", l->peer_caps); i += scnprintf(buf + i, sz - i, " %u", l->silent_intv_cnt); i += scnprintf(buf + i, sz - i, " %u", l->rst_cnt); - i += scnprintf(buf + i, sz - i, " %u", l->prev_from); + i += scnprintf(buf + i, sz - i, " %u", 0); i += scnprintf(buf + i, sz - i, " %u", 0); i += scnprintf(buf + i, sz - i, " %u", l->acked); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index da509f0eb9ca..d7ebc9e955f6 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -102,13 +102,15 @@ struct plist; #define TIPC_MEDIA_INFO_OFFSET 5 struct tipc_skb_cb { - u32 bytes_read; - u32 orig_member; struct sk_buff *tail; unsigned long nxt_retr; - bool validated; + unsigned long retr_stamp; + u32 bytes_read; + u32 orig_member; u16 chain_imp; u16 ackers; + u16 retr_cnt; + bool validated; }; #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) -- 2.13.7 |
From: <joh...@de...> - 2019-07-19 03:10:39
|
From: John Rutherford <joh...@de...> Since node internal messages are passed directly to the socket, it is not possible to observe those messages via tcpdump or wireshark. We now remedy this by making it possible to clone such messages and send the clones to the loopback interface. The clones are dropped at reception and have no functional role except making the traffic visible. The feature is enabled if network taps are active for the loopback device. pcap filtering restrictions require the messages to be presented to the receiving side of the loopback device. v3 - Function dev_nit_active used to check for network taps. - Procedure netif_rx_ni used to send cloned messages to loopback device. Signed-off-by: John Rutherford <joh...@de...> --- net/tipc/bcast.c | 4 +++- net/tipc/bearer.c | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ net/tipc/bearer.h | 10 +++++++++ net/tipc/core.c | 5 +++++ net/tipc/core.h | 3 +++ net/tipc/node.c | 1 + net/tipc/topsrv.c | 2 ++ 7 files changed, 89 insertions(+), 1 deletion(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 6c997d4..235331d 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -406,8 +406,10 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); } - if (dests->local) + if (dests->local) { + tipc_loopback_trace(net, &localq); tipc_sk_mcast_rcv(net, &localq, &inputq); + } exit: /* This queue should normally be empty by now */ __skb_queue_purge(pkts); diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 2bed658..5273ec5 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -389,6 +389,12 @@ int tipc_enable_l2_media(struct net *net, struct tipc_bearer *b, dev_put(dev); return -EINVAL; } + if (dev == net->loopback_dev) { + dev_put(dev); + pr_info("Bearer <%s>: loopback device not permitted\n", + b->name); + return -EINVAL; + } /* Autoconfigure own node identity if needed */ if (!tipc_own_id(net) && hwaddr_len <= NODE_ID_LEN) { @@ -674,6 +680,65 @@ void tipc_bearer_stop(struct net *net) } } +void tipc_clone_to_loopback(struct net *net, struct sk_buff_head *pkts) +{ + struct net_device *dev = net->loopback_dev; + struct sk_buff *skb, *_skb; + int exp; + + skb_queue_walk(pkts, _skb) { + skb = pskb_copy(_skb, GFP_ATOMIC); + if (!skb) + continue; + + exp = SKB_DATA_ALIGN(dev->hard_header_len - skb_headroom(skb)); + if (exp > 0 && pskb_expand_head(skb, exp, 0, GFP_ATOMIC)) { + kfree_skb(skb); + continue; + } + + skb_reset_network_header(skb); + dev_hard_header(skb, dev, ETH_P_TIPC, dev->dev_addr, + dev->dev_addr, skb->len); + skb->dev = dev; + skb->pkt_type = PACKET_HOST; + skb->ip_summed = CHECKSUM_UNNECESSARY; + skb->protocol = eth_type_trans(skb, dev); + netif_rx_ni(skb); + } +} + +static int tipc_loopback_rcv_pkt(struct sk_buff *skb, struct net_device *dev, + struct packet_type *pt, struct net_device *od) +{ + consume_skb(skb); + return NET_RX_SUCCESS; +} + +int tipc_attach_loopback(struct net *net) +{ + struct net_device *dev = net->loopback_dev; + struct tipc_net *tn = tipc_net(net); + + if (!dev) + return -ENODEV; + + dev_hold(dev); + tn->loopback_pt.dev = dev; + tn->loopback_pt.type = htons(ETH_P_TIPC); + tn->loopback_pt.func = tipc_loopback_rcv_pkt; + dev_add_pack(&tn->loopback_pt); + return 0; +} + +void tipc_detach_loopback(struct net *net) +{ + struct tipc_net *tn = tipc_net(net); + + dev_remove_pack(&tn->loopback_pt); + dev_put(net->loopback_dev); +} + /* Caller should hold rtnl_lock to protect the bearer */ static int __tipc_nl_add_bearer(struct tipc_nl_msg *msg, struct tipc_bearer *bearer, int nlflags) diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h index 7f4c569..ea0f3c4 100644 --- a/net/tipc/bearer.h +++ b/net/tipc/bearer.h @@ -232,6 +232,16 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id, struct tipc_media_addr *dst); void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id, struct sk_buff_head *xmitq); +void tipc_clone_to_loopback(struct net *net, struct sk_buff_head *pkts); +int tipc_attach_loopback(struct net *net); +void tipc_detach_loopback(struct net *net); + +static inline void tipc_loopback_trace(struct net *net, + struct sk_buff_head *pkts) +{ + if (unlikely(dev_nit_active(net->loopback_dev))) + tipc_clone_to_loopback(net, pkts); +} /* check if device MTU is too low for tipc headers */ static inline bool tipc_mtu_bad(struct net_device *dev, unsigned int reserve) diff --git a/net/tipc/core.c b/net/tipc/core.c index c837072..23cb379 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -82,6 +82,10 @@ static int __net_init tipc_init_net(struct net *net) if (err) goto out_bclink; + err = tipc_attach_loopback(net); + if (err) + goto out_bclink; + return 0; out_bclink: @@ -94,6 +98,7 @@ static int __net_init tipc_init_net(struct net *net) static void __net_exit tipc_exit_net(struct net *net) { + tipc_detach_loopback(net); tipc_net_stop(net); tipc_bcast_stop(net); tipc_nametbl_stop(net); diff --git a/net/tipc/core.h b/net/tipc/core.h index 7a68e1b..60d8295 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -125,6 +125,9 @@ struct tipc_net { /* Cluster capabilities */ u16 capabilities; + + /* Tracing of node internal messages */ + struct packet_type loopback_pt; }; static inline struct tipc_net *tipc_net(struct net *net) diff --git a/net/tipc/node.c b/net/tipc/node.c index 550581d..16d251b 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1443,6 +1443,7 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, int rc; if (in_own_node(net, dnode)) { + tipc_loopback_trace(net, list); tipc_sk_rcv(net, list); return 0; } diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c index f345662..e3a6ba1 100644 --- a/net/tipc/topsrv.c +++ b/net/tipc/topsrv.c @@ -40,6 +40,7 @@ #include "socket.h" #include "addr.h" #include "msg.h" +#include "bearer.h" #include <net/sock.h> #include <linux/module.h> @@ -608,6 +609,7 @@ static void tipc_topsrv_kern_evt(struct net *net, struct tipc_event *evt) memcpy(msg_data(buf_msg(skb)), evt, sizeof(*evt)); skb_queue_head_init(&evtq); __skb_queue_tail(&evtq, skb); + tipc_loopback_trace(net, &evtq); tipc_sk_rcv(net, &evtq); } -- 2.11.0 |
From: Jon M. <jon...@er...> - 2019-07-19 02:30:55
|
Hi Tung, See below. > -----Original Message----- > From: tung quang nguyen <tun...@de...> > Sent: 18-Jul-19 05:45 > To: Jon Maloy <jon...@er...>; tipc- > dis...@li...; ma...@do...; > yin...@wi... > Subject: RE: [tipc-discussion][PATCH v1 1/1] tipc: fix starvation of sending > sockets when handling wakeup messages > > Hi Jon, > > "Wakeup messages are not time critical, so this is not a good reason to bypass > the tipc_sk_rcv() function. > It is an important principle to only have one single entry point in the socket for > arriving messages, and we should stick to that." > [Tung]: There is good reasons to let wakeup messages bypass tipc_sk_rcv(): > - It is not necessary to grab/release lock before/after dequeuing a wakeup > messages. This creates overhead. Yes, it does create overhead. The question if that matters, i.e., it is worth spending any time on optimizing it. To understand that, you cannot look at the function isolated, but put in perspective. Is it executed often enough, and is the improvement significant enough to make any difference? I added some temporary statistics to the code, to find out, running the benchmark with 100 connections and one interface. I counted number of messages (not packets) sent per connections, and number and percentage of congestions/wakeups during the same period: [16598.348829] 64 bytes messages sent: 6400001, congestions: 12016 (0%), wakeups: 12016 [16601.895640] 256 bytes messages sent: 3200001, congestions: 59451 (1%), wakeups: 59451 [16608.732859] 1024 bytes messages sent: 1600001, congestions: 789841 (49%), wakeups: 789841 [16616.890077] 4096 bytes messages sent: 800001, congestions: 572129 (71%), wakeups: 572129 [16632.246666] 16384 bytes messages sent: 400001, congestions: 353986 (88%), wakeups: 353986 [16661.592722] 65536 bytes messages sent: 199999, congestions: 188856 (94%), wakeups: 188855 As you see the number of congestions/wakeups as percentage of the total sent messages for small messages is so small that they cannot possibly have any impact on overall throughput. So your 23% difference has no merit here. For larger messages the result is less clear. With very large messages there is a congestion/wakeup for almost every sent message, but on the other hand the cost for each sent message is also much higher here, so it is unlikely that a few instructions extra in the reception of wakeup messages makes any difference. Also remember, a congestion means that each process must be suspended and woken up again, something which are expensive operations. So, in brief, I don't think some extra overhead in prepare_wakeup() and in tipc_sk_rcv() for such messages makes any difference in the big picture. > - A large number of wakeup messages and data messages creates more > overhead for destination port filter function. By consuming wakeup messages > without grabbing queue lock, the overhead for consuming data & wakeup > messages is reduced. > > I agree with you that we should have a single entry point for arriving > messages. We can use the same function tipc_sk_rcv() and it will call > tipc_sk_rcv_inputq() and tipc_sk_rcv_wakeupq(). That wouldn't make any difference. We would still have two different ways of handling incoming messages. It is the extra wakeup queue, and the distinct way of handling it, that I strongly object to. > > " Adding a new queue on the stack and then pass it along everywhere is very > intrusive, introduces a lot of extra code and complexity, and adds > performance overhead to the critical data path. This is not necessary to > achieve what you want." > [Tung]: It introduces extra code but I do not think it is more complex. And > there is no more performance overhead (I will show it in another mail). This change by itself might not give a measurable difference, but it is undisputable that you add list initializations on the critical data path, and that you pass on an extra parameter in a lot of functions on that path. Such changes must by principle be avoided if ever possible, because they tend to accumulate to extra overhead and poorer performance. > Actually, to achieve that (fix starvation issue), I believe we have to do 2 things > as I did and the test result was good. I made my own measurements before I sent you the patch yesterday, and saw no difference in performance. I don't know how many times you ran your measurements, but what you present in your mail is well within the limits of stochastic variations you can expect when you make one or only a few runs. After having fixed a bug in my algorithm (backog[imp].len may be > backlog[imp].limit) and made some other changes I re-ran my tests, many times, and I still se no significant difference. If anything, it would be that my algorithm might be slightly faster than yours. I will send you this in a separate mail. BR ///jon > > Best regards, > Tung Nguyen > > -----Original Message----- > From: Jon Maloy <jon...@er...> > Sent: Wednesday, July 17, 2019 10:47 PM > To: Tung Quang Nguyen <tun...@de...>; tipc- > dis...@li...; ma...@do...; > yin...@wi... > Subject: RE: [tipc-discussion][PATCH v1 1/1] tipc: fix starvation of sending > sockets when handling wakeup messages > > Hi Tung, > Your analysis seems correct, but your patch is too intrusive, and can be made > much simpler. > See below. > > > -----Original Message----- > > From: Tung Nguyen <tun...@de...> > > Sent: 9-Jul-19 22:39 > > To: tip...@li...; Jon Maloy > > <jon...@er...>; ma...@do...; > yin...@wi... > > Subject: [tipc-discussion][PATCH v1 1/1] tipc: fix starvation of > > sending > sockets > > when handling wakeup messages > > > > Commit 365ad35 ("tipc: reduce risk of user starvation during link > congestion") > > aimed to allow senders to add their lists of socket buffers to the > > link > backlog > > queues under link congestion. However, when dequeuing the link wakeup > > queue by link_prepare_wakeup(), there might be a worst case scenario: > > - More than 10 wakeup messages near the wakeup queue head are not > > dequeued because the backlog queue length is still greater than the limit. > > - Many wakeup messages near the wakeup queue tail are not dequeued > > though their importance is different from the one of 10 wakeup > > messages near the queue head and the backlog queue length is less than the > limit. > > > > Furthermore, function tipc_sk_rcv() consumes both normal data messages > > and wakeup messages from the link input queue. By allowing > > oversubscriptions, the number of messages passed through tipc_sk_rcv() > > would be higher. > > Applying destination port filter to wakeup messages via > tipc_skb_peek_port() > > is not necessary and even causes more overhead. > > Wakeup messages are not time critical, so this is not a good reason to bypass > the tipc_sk_rcv() function. > It is an important principle to only have one single entry point in the socket for > arriving messages, and we should stick to that. > > > > > As a result, some non-blocking socket senders are not able to send > > data because epoll() takes many seconds to return EPOLLOUT. > > > > This commit fixes this issues by: > > - Allowing to dequeue as many wakeup messages as possible. > > That is good. Just removing the 10-skb limit would probably be sufficient to > resolve the problem. > > > - Separating wakeup messages from the link input queue. All wakeup > > messages are now put in a local queue and consumed in a simple way. > > There is no reason to do this. Instead, you should just take the messages in the > wakeup queue, sort them by importance order into a temporary queue inside > link_prepare_wakeup(), and add that queue to the head of the input queue, > instead of to the tail. That is all you need to do, in addition to removing the 10- > buffer limit. > > Adding a new queue on the stack and then pass it along everywhere is very > intrusive, introduces a lot of extra code and complexity, and adds > performance overhead to the critical data path. This is not necessary to > achieve what you want. > > BR > ///jon > > > > > Fixes: 365ad35 ("tipc: reduce risk of user starvation during link > congestion") > > Signed-off-by: Tung Nguyen <tun...@de...> > > --- > > net/tipc/bcast.c | 42 +++++++++++----------- > > net/tipc/link.c | 67 +++++++++++++++++----------------- > > net/tipc/link.h | 12 ++++--- > > net/tipc/node.c | 28 +++++++++++---- > > net/tipc/socket.c | 91 > +++++++++++++++++++++++++++++++++++++++++-- > > ---- > > net/tipc/socket.h | 1 + > > 6 files changed, 166 insertions(+), 75 deletions(-) > > > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > > 6c997d4a6218..4de0f9780ef5 100644 > > --- a/net/tipc/bcast.c > > +++ b/net/tipc/bcast.c > > @@ -421,11 +421,11 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, int tipc_bcast_rcv(struct net *net, struct > > tipc_link > *l, > > struct sk_buff *skb) { > > struct tipc_msg *hdr = buf_msg(skb); > > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > > - struct sk_buff_head xmitq; > > + struct sk_buff_head xmitq, wakeupq; > > int rc; > > > > __skb_queue_head_init(&xmitq); > > + __skb_queue_head_init(&wakeupq); > > > > if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { > > kfree_skb(skb); > > @@ -434,16 +434,16 @@ int tipc_bcast_rcv(struct net *net, struct > > tipc_link *l, struct sk_buff *skb) > > > > tipc_bcast_lock(net); > > if (msg_user(hdr) == BCAST_PROTOCOL) > > - rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); > > + rc = tipc_link_bc_nack_rcv(l, skb, &xmitq, &wakeupq); > > else > > - rc = tipc_link_rcv(l, skb, NULL); > > + rc = tipc_link_rcv(l, skb, NULL, &wakeupq); > > tipc_bcast_unlock(net); > > > > tipc_bcbase_xmit(net, &xmitq); > > > > /* Any socket wakeup messages ? */ > > - if (!skb_queue_empty(inputq)) > > - tipc_sk_rcv(net, inputq); > > + if (!skb_queue_empty(&wakeupq)) > > + tipc_sk_rcv_wakeup(net, &wakeupq); > > > > return rc; > > } > > @@ -455,25 +455,25 @@ int tipc_bcast_rcv(struct net *net, struct > > tipc_link *l, struct sk_buff *skb) void tipc_bcast_ack_rcv(struct net > > *net, struct tipc_link *l, > > struct tipc_msg *hdr) > > { > > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > > u16 acked = msg_bcast_ack(hdr); > > - struct sk_buff_head xmitq; > > + struct sk_buff_head xmitq, wakeupq; > > > > /* Ignore bc acks sent by peer before bcast synch point was received > */ > > if (msg_bc_ack_invalid(hdr)) > > return; > > > > __skb_queue_head_init(&xmitq); > > + __skb_queue_head_init(&wakeupq); > > > > tipc_bcast_lock(net); > > - tipc_link_bc_ack_rcv(l, acked, &xmitq); > > + tipc_link_bc_ack_rcv(l, acked, &xmitq, &wakeupq); > > tipc_bcast_unlock(net); > > > > tipc_bcbase_xmit(net, &xmitq); > > > > /* Any socket wakeup messages ? */ > > - if (!skb_queue_empty(inputq)) > > - tipc_sk_rcv(net, inputq); > > + if (!skb_queue_empty(&wakeupq)) > > + tipc_sk_rcv_wakeup(net, &wakeupq); > > } > > > > /* tipc_bcast_synch_rcv - check and update rcv link with peer's send > state > > @@ -483,17 +483,17 @@ void tipc_bcast_ack_rcv(struct net *net, struct > > tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct > > tipc_link > *l, > > struct tipc_msg *hdr) > > { > > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > > - struct sk_buff_head xmitq; > > + struct sk_buff_head xmitq, wakeupq; > > int rc = 0; > > > > __skb_queue_head_init(&xmitq); > > + __skb_queue_head_init(&wakeupq); > > > > tipc_bcast_lock(net); > > if (msg_type(hdr) != STATE_MSG) { > > tipc_link_bc_init_rcv(l, hdr); > > } else if (!msg_bc_ack_invalid(hdr)) { > > - tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); > > + tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq, > > &wakeupq); > > rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq); > > } > > tipc_bcast_unlock(net); > > @@ -501,8 +501,8 @@ int tipc_bcast_sync_rcv(struct net *net, struct > > tipc_link *l, > > tipc_bcbase_xmit(net, &xmitq); > > > > /* Any socket wakeup messages ? */ > > - if (!skb_queue_empty(inputq)) > > - tipc_sk_rcv(net, inputq); > > + if (!skb_queue_empty(&wakeupq)) > > + tipc_sk_rcv_wakeup(net, &wakeupq); > > return rc; > > } > > > > @@ -529,13 +529,13 @@ void tipc_bcast_add_peer(struct net *net, struct > > tipc_link *uc_l, void tipc_bcast_remove_peer(struct net *net, struct > tipc_link > > *rcv_l) { > > struct tipc_link *snd_l = tipc_bc_sndlink(net); > > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > > - struct sk_buff_head xmitq; > > + struct sk_buff_head xmitq, wakeupq; > > > > __skb_queue_head_init(&xmitq); > > + __skb_queue_head_init(&wakeupq); > > > > tipc_bcast_lock(net); > > - tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); > > + tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq, &wakeupq); > > tipc_bcbase_select_primary(net); > > tipc_bcbase_calc_bc_threshold(net); > > tipc_bcast_unlock(net); > > @@ -543,8 +543,8 @@ void tipc_bcast_remove_peer(struct net *net, > > struct tipc_link *rcv_l) > > tipc_bcbase_xmit(net, &xmitq); > > > > /* Any socket wakeup messages ? */ > > - if (!skb_queue_empty(inputq)) > > - tipc_sk_rcv(net, inputq); > > + if (!skb_queue_empty(&wakeupq)) > > + tipc_sk_rcv_wakeup(net, &wakeupq); > > } > > > > int tipc_bclink_reset_stats(struct net *net) diff --git > > a/net/tipc/link.c b/net/tipc/link.c index 2050fd386642..e67d7e6a793b > > 100644 > > --- a/net/tipc/link.c > > +++ b/net/tipc/link.c > > @@ -237,7 +237,8 @@ static int link_is_up(struct tipc_link *l) } > > > > static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > > - struct sk_buff_head *xmitq); > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq); > > static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, > > bool probe, > > bool probe_reply, u16 rcvgap, > > int tolerance, int priority, @@ -331,6 +332,11 > @@ struct > > sk_buff_head *tipc_link_inputq(struct tipc_link *l) > > return l->inputq; > > } > > > > +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l) { > > + return &l->wakeupq; > > +} > > + > > char tipc_link_plane(struct tipc_link *l) { > > return l->net_plane; > > @@ -355,19 +361,21 @@ void tipc_link_add_bc_peer(struct tipc_link > > *snd_l, > > > > void tipc_link_remove_bc_peer(struct tipc_link *snd_l, > > struct tipc_link *rcv_l, > > - struct sk_buff_head *xmitq) > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq) > > { > > u16 ack = snd_l->snd_nxt - 1; > > > > snd_l->ackers--; > > rcv_l->bc_peer_is_up = true; > > rcv_l->state = LINK_ESTABLISHED; > > - tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); > > + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq, wakeupq); > > trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); > > tipc_link_reset(rcv_l); > > rcv_l->state = LINK_RESET; > > if (!snd_l->ackers) { > > trace_tipc_link_reset(snd_l, TIPC_DUMP_ALL, "zero ackers!"); > > + skb_queue_splice_tail_init(&snd_l->wakeupq, wakeupq); > > tipc_link_reset(snd_l); > > snd_l->state = LINK_RESET; > > __skb_queue_purge(xmitq); > > @@ -500,7 +508,7 @@ bool tipc_link_create(struct net *net, char > > *if_name, int bearer_id, > > __skb_queue_head_init(&l->backlogq); > > __skb_queue_head_init(&l->deferdq); > > __skb_queue_head_init(&l->failover_deferdq); > > - skb_queue_head_init(&l->wakeupq); > > + __skb_queue_head_init(&l->wakeupq); > > skb_queue_head_init(l->inputq); > > return true; > > } > > @@ -839,9 +847,8 @@ static int link_schedule_user(struct tipc_link *l, > struct > > tipc_msg *hdr) > > dnode, l->addr, dport, 0, 0); > > if (!skb) > > return -ENOBUFS; > > - msg_set_dest_droppable(buf_msg(skb), true); > > TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); > > - skb_queue_tail(&l->wakeupq, skb); > > + __skb_queue_tail(&l->wakeupq, skb); > > l->stats.link_congs++; > > trace_tipc_link_conges(l, TIPC_DUMP_ALL, "wakeup scheduled!"); > > return -ELINKCONG; > > @@ -853,46 +860,34 @@ static int link_schedule_user(struct tipc_link > > *l, struct tipc_msg *hdr) > > * Wake up a number of waiting users, as permitted by available space > > * in the send queue > > */ > > -static void link_prepare_wakeup(struct tipc_link *l) > > +static void link_prepare_wakeup(struct tipc_link *l, > > + struct sk_buff_head *wakeupq) > > { > > struct sk_buff *skb, *tmp; > > - int imp, i = 0; > > + int imp; > > > > skb_queue_walk_safe(&l->wakeupq, skb, tmp) { > > imp = TIPC_SKB_CB(skb)->chain_imp; > > if (l->backlog[imp].len < l->backlog[imp].limit) { > > - skb_unlink(skb, &l->wakeupq); > > - skb_queue_tail(l->inputq, skb); > > - } else if (i++ > 10) { > > - break; > > + __skb_unlink(skb, &l->wakeupq); > > + __skb_queue_tail(wakeupq, skb); > > } > > } > > } > > > > void tipc_link_reset(struct tipc_link *l) { > > - struct sk_buff_head list; > > - > > - __skb_queue_head_init(&list); > > - > > l->in_session = false; > > /* Force re-synch of peer session number before establishing */ > > l->peer_session--; > > l->session++; > > l->mtu = l->advertised_mtu; > > > > - spin_lock_bh(&l->wakeupq.lock); > > - skb_queue_splice_init(&l->wakeupq, &list); > > - spin_unlock_bh(&l->wakeupq.lock); > > - > > - spin_lock_bh(&l->inputq->lock); > > - skb_queue_splice_init(&list, l->inputq); > > - spin_unlock_bh(&l->inputq->lock); > > - > > __skb_queue_purge(&l->transmq); > > __skb_queue_purge(&l->deferdq); > > __skb_queue_purge(&l->backlogq); > > __skb_queue_purge(&l->failover_deferdq); > > + __skb_queue_purge(&l->wakeupq); > > l->backlog[TIPC_LOW_IMPORTANCE].len = 0; > > l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; > > l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; @@ -1445,9 +1440,11 > @@ > > static int tipc_link_build_nack_msg(struct tipc_link *l, > > * @l: the link that should handle the message > > * @skb: TIPC packet > > * @xmitq: queue to place packets to be sent after this call > > + * @wakeupq: list of wakeup messages to be consumed after this call > > */ > > int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, > > - struct sk_buff_head *xmitq) > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq) > > { > > struct sk_buff_head *defq = &l->deferdq; > > struct tipc_msg *hdr = buf_msg(skb); @@ -1456,7 +1453,7 @@ int > > tipc_link_rcv(struct tipc_link *l, struct > sk_buff > > *skb, > > > > /* Verify and update link state */ > > if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) > > - return tipc_link_proto_rcv(l, skb, xmitq); > > + return tipc_link_proto_rcv(l, skb, xmitq, wakeupq); > > > > /* Don't send probe at next timeout expiration */ > > l->silent_intv_cnt = 0; > > @@ -1484,7 +1481,7 @@ int tipc_link_rcv(struct tipc_link *l, struct > sk_buff > > *skb, > > l->stale_cnt = 0; > > tipc_link_advance_backlog(l, xmitq); > > if (unlikely(!skb_queue_empty(&l->wakeupq))) > > - link_prepare_wakeup(l); > > + link_prepare_wakeup(l, wakeupq); > > } > > > > /* Defer delivery if sequence gap */ @@ -1518,6 +1515,7 @@ > static > > void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool > > probe, > > bool probe_reply, u16 rcvgap, > > int tolerance, int priority, > > struct sk_buff_head *xmitq) > > + > > { > > struct tipc_link *bcl = l->bc_rcvlink; > > struct sk_buff *skb; > > @@ -1786,7 +1784,8 @@ bool tipc_link_validate_msg(struct tipc_link *l, > > struct tipc_msg *hdr) > > * network plane > > */ > > static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > > - struct sk_buff_head *xmitq) > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq) > > { > > struct tipc_msg *hdr = buf_msg(skb); > > struct tipc_gap_ack_blks *ga = NULL; @@ -1926,7 +1925,7 @@ static > > int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > > > > tipc_link_advance_backlog(l, xmitq); > > if (unlikely(!skb_queue_empty(&l->wakeupq))) > > - link_prepare_wakeup(l); > > + link_prepare_wakeup(l, wakeupq); > > } > > exit: > > kfree_skb(skb); > > @@ -2072,7 +2071,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, > struct > > tipc_msg *hdr, } > > > > void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > > - struct sk_buff_head *xmitq) > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq) > > { > > struct sk_buff *skb, *tmp; > > struct tipc_link *snd_l = l->bc_sndlink; @@ -2102,7 +2102,7 @@ void > > tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > > l->acked = acked; > > tipc_link_advance_backlog(snd_l, xmitq); > > if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) > > - link_prepare_wakeup(snd_l); > > + link_prepare_wakeup(snd_l, wakeupq); > > } > > > > /* tipc_link_bc_nack_rcv(): receive broadcast nack message @@ -2110,7 > > +2110,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > > * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5. > > */ > > int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, > > - struct sk_buff_head *xmitq) > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq) > > { > > struct tipc_msg *hdr = buf_msg(skb); > > u32 dnode = msg_destnode(hdr); > > @@ -2130,7 +2131,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, > struct > > sk_buff *skb, > > return 0; > > > > if (dnode == tipc_own_addr(l->net)) { > > - tipc_link_bc_ack_rcv(l, acked, xmitq); > > + tipc_link_bc_ack_rcv(l, acked, xmitq, wakeupq); > > rc = tipc_link_retrans(l->bc_sndlink, l, from, to, xmitq); > > l->stats.recv_nacks++; > > return rc; > > diff --git a/net/tipc/link.h b/net/tipc/link.h index > > adcad65e761c..9568bd965945 100644 > > --- a/net/tipc/link.h > > +++ b/net/tipc/link.h > > @@ -107,6 +107,7 @@ void tipc_link_reset_stats(struct tipc_link *l); > > int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, > > struct sk_buff_head *xmitq); > > struct sk_buff_head *tipc_link_inputq(struct tipc_link *l); > > +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l); > > u16 tipc_link_rcv_nxt(struct tipc_link *l); > > u16 tipc_link_acked(struct tipc_link *l); > > u32 tipc_link_id(struct tipc_link *l); @@ -130,25 +131,28 @@ int > > __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, int > > tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); > > int tipc_link_timeout(struct tipc_link *l, struct > sk_buff_head > > *xmitq); int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, > > - struct sk_buff_head *xmitq); > > + struct sk_buff_head *xmitq, struct sk_buff_head *wakeupq); > > int tipc_link_build_state_msg(struct tipc_link *l, struct > > sk_buff_head > *xmitq); > > void tipc_link_add_bc_peer(struct tipc_link *snd_l, > > struct tipc_link *uc_l, > > struct sk_buff_head *xmitq); > > void tipc_link_remove_bc_peer(struct tipc_link *snd_l, > > struct tipc_link *rcv_l, > > - struct sk_buff_head *xmitq); > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq); > > int tipc_link_bc_peers(struct tipc_link *l); void > tipc_link_set_mtu(struct > > tipc_link *l, int mtu); int tipc_link_mtu(struct tipc_link *l); void > > tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > > - struct sk_buff_head *xmitq); > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq); > > void tipc_link_build_bc_sync_msg(struct tipc_link *l, > > struct sk_buff_head *xmitq); > > void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); > > int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, > > struct sk_buff_head *xmitq); > > int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, > > - struct sk_buff_head *xmitq); > > + struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq); > > bool tipc_link_too_silent(struct tipc_link *l); #endif diff --git > > a/net/tipc/node.c b/net/tipc/node.c index 550581d47d51..d0ec29081b11 > > 100644 > > --- a/net/tipc/node.c > > +++ b/net/tipc/node.c > > @@ -154,6 +154,7 @@ enum { > > > > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > > struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq, > > struct tipc_media_addr **maddr); static void > > tipc_node_link_down(struct tipc_node *n, int bearer_id, > > bool delete); > > @@ -803,6 +804,7 @@ static void tipc_node_link_failover(struct > > tipc_node *n, struct tipc_link *l, > > */ > > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > > struct sk_buff_head *xmitq, > > + struct sk_buff_head *wakeupq, > > struct tipc_media_addr **maddr) { > > struct tipc_link_entry *le = &n->links[*bearer_id]; @@ -851,6 +853,7 > > @@ static void __tipc_node_link_down(struct tipc_node *n, int > *bearer_id, > > tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); > > trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down!"); > > tipc_link_fsm_evt(l, LINK_RESET_EVT); > > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); > > tipc_link_reset(l); > > tipc_link_build_reset_msg(l, xmitq); > > *maddr = &n->links[*bearer_id].maddr; @@ -868,6 +871,7 @@ > static > > void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > > n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1); > > tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); > > trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down -> failover!"); > > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); > > tipc_link_reset(l); > > tipc_link_fsm_evt(l, LINK_RESET_EVT); > > tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); @@ -881,18 > > +885,20 @@ static void tipc_node_link_down(struct tipc_node *n, int > > bearer_id, bool delete) > > struct tipc_media_addr *maddr = NULL; > > struct tipc_link *l = le->link; > > int old_bearer_id = bearer_id; > > - struct sk_buff_head xmitq; > > + struct sk_buff_head xmitq, wakeupq; > > > > if (!l) > > return; > > > > __skb_queue_head_init(&xmitq); > > + __skb_queue_head_init(&wakeupq); > > > > tipc_node_write_lock(n); > > if (!tipc_link_is_establishing(l)) { > > - __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); > > + __tipc_node_link_down(n, &bearer_id, &xmitq, &wakeupq, > > &maddr); > > } else { > > /* Defuse pending tipc_node_link_up() */ > > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), &wakeupq); > > tipc_link_reset(l); > > tipc_link_fsm_evt(l, LINK_RESET_EVT); > > } > > @@ -908,6 +914,7 @@ static void tipc_node_link_down(struct tipc_node > > *n, int bearer_id, bool delete) > > if (!skb_queue_empty(&xmitq)) > > tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); > > tipc_sk_rcv(n->net, &le->inputq); > > + tipc_sk_rcv_wakeup(n->net, &wakeupq); > > } > > > > static bool node_is_up(struct tipc_node *n) @@ -1652,6 +1659,7 @@ > > static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, > > u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); > > u16 exp_pkts = msg_msgcnt(hdr); > > u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; > > + struct sk_buff_head wakeupq; > > int state = n->state; > > struct tipc_link *l, *tnl, *pl = NULL; > > struct tipc_media_addr *maddr; > > @@ -1711,9 +1719,12 @@ static bool tipc_node_check_state(struct > > tipc_node *n, struct sk_buff *skb, > > if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { > > syncpt = oseqno + exp_pkts - 1; > > if (pl && !tipc_link_is_reset(pl)) { > > - __tipc_node_link_down(n, &pb_id, xmitq, &maddr); > > + __skb_queue_head_init(&wakeupq); > > + __tipc_node_link_down(n, &pb_id, xmitq, > > + &wakeupq, &maddr); > > trace_tipc_node_link_down(n, true, > > "node link down <- > failover!"); > > + skb_queue_splice_tail(&wakeupq, > tipc_link_wakeupq(l)); > > > tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl), > > > tipc_link_inputq(l)); > > } > > @@ -1795,7 +1806,7 @@ static bool tipc_node_check_state(struct > > tipc_node *n, struct sk_buff *skb, > > */ > > void tipc_rcv(struct net *net, struct sk_buff *skb, struct > > tipc_bearer > *b) { > > - struct sk_buff_head xmitq; > > + struct sk_buff_head xmitq, wakeupq; > > struct tipc_node *n; > > struct tipc_msg *hdr; > > int bearer_id = b->identity; > > @@ -1805,6 +1816,7 @@ void tipc_rcv(struct net *net, struct sk_buff > > *skb, struct tipc_bearer *b) > > u16 bc_ack; > > > > __skb_queue_head_init(&xmitq); > > + __skb_queue_head_init(&wakeupq); > > > > /* Ensure message is well-formed before touching the header */ > > if (unlikely(!tipc_msg_validate(&skb))) > > @@ -1842,7 +1854,7 @@ void tipc_rcv(struct net *net, struct sk_buff > > *skb, struct tipc_bearer *b) > > if (likely((n->state == SELF_UP_PEER_UP) && (usr != > > TUNNEL_PROTOCOL))) { > > spin_lock_bh(&le->lock); > > if (le->link) { > > - rc = tipc_link_rcv(le->link, skb, &xmitq); > > + rc = tipc_link_rcv(le->link, skb, &xmitq, &wakeupq); > > skb = NULL; > > } > > spin_unlock_bh(&le->lock); > > @@ -1856,7 +1868,8 @@ void tipc_rcv(struct net *net, struct sk_buff > > *skb, struct tipc_bearer *b) > > tipc_node_write_lock(n); > > if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) { > > if (le->link) { > > - rc = tipc_link_rcv(le->link, skb, &xmitq); > > + rc = tipc_link_rcv(le->link, skb, > > + &xmitq, &wakeupq); > > skb = NULL; > > } > > } > > @@ -1878,6 +1891,9 @@ void tipc_rcv(struct net *net, struct sk_buff > > *skb, struct tipc_bearer *b) > > if (!skb_queue_empty(&le->inputq)) > > tipc_sk_rcv(net, &le->inputq); > > > > + if (!skb_queue_empty(&wakeupq)) > > + tipc_sk_rcv_wakeup(net, &wakeupq); > > + > > if (!skb_queue_empty(&xmitq)) > > tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); > > > > diff --git a/net/tipc/socket.c b/net/tipc/socket.c index > > dd8537f988c4..fa48fb14add7 100644 > > --- a/net/tipc/socket.c > > +++ b/net/tipc/socket.c > > @@ -2224,6 +2224,33 @@ static int tipc_sk_backlog_rcv(struct sock *sk, > > struct sk_buff *skb) > > return 0; > > } > > > > +/** > > + * tipc_sk_add_backlog - add the skb to the socket backlog queue > > + * @sk: socket where the skb should be enqueued > > + * @skb: the skb being enqueued > > + * > > + * Return true if the skb was added. Otherwise, return false */ > > +static bool tipc_sk_add_backlog(struct sock *sk, struct sk_buff *skb) { > > + unsigned int lim; > > + atomic_t *dcnt; > > + > > + /* Try backlog, compensating for double-counted bytes */ > > + dcnt = &tipc_sk(sk)->dupl_rcvcnt; > > + if (!sk->sk_backlog.len) > > + atomic_set(dcnt, 0); > > + > > + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); > > + if (likely(!sk_add_backlog(sk, skb, lim))) { > > + trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, > > + "bklg & rcvq >90% allocated!"); > > + return true; > > + } > > + > > + return false; > > +} > > + > > /** > > * tipc_sk_enqueue - extract all buffers with destination 'dport' from > > * inputq and try adding them to socket or backlog > queue > > @@ -2238,8 +2265,6 @@ static void tipc_sk_enqueue(struct sk_buff_head > > *inputq, struct sock *sk, { > > unsigned long time_limit = jiffies + 2; > > struct sk_buff *skb; > > - unsigned int lim; > > - atomic_t *dcnt; > > u32 onode; > > > > while (skb_queue_len(inputq)) { > > @@ -2256,16 +2281,9 @@ static void tipc_sk_enqueue(struct > sk_buff_head > > *inputq, struct sock *sk, > > continue; > > } > > > > - /* Try backlog, compensating for double-counted bytes */ > > - dcnt = &tipc_sk(sk)->dupl_rcvcnt; > > - if (!sk->sk_backlog.len) > > - atomic_set(dcnt, 0); > > - lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); > > - if (likely(!sk_add_backlog(sk, skb, lim))) { > > - trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, > > - "bklg & rcvq >90% > allocated!"); > > + /* Add the skb to the socket backlog queue */ > > + if (tipc_sk_add_backlog(sk, skb)) > > continue; > > - } > > > > trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!"); > > /* Overload => reject message back to sender */ @@ -2332,6 > > +2350,57 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head > > +*inputq) > > } > > } > > > > +/** > > + * tipc_sk_rcv_wakeup - handle a chain of wakeup messages > > + * @wakeupq: list of wakeup messages > > + * Consumes all messages in the list until it is empty */ void > > +tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq) { > > + u32 dport = 0; > > + struct tipc_sock *tsk; > > + struct sock *sk; > > + struct sk_buff *skb, *tmp; > > + > > +start: > > + if (!skb_queue_len(wakeupq)) > > + return; > > + > > + skb_queue_walk_safe(wakeupq, skb, tmp) { > > + dport = msg_destport(buf_msg(skb)); > > + tsk = tipc_sk_lookup(net, dport); > > + > > + if (!unlikely(tsk)) { > > + __skb_unlink(skb, wakeupq); > > + kfree_skb(skb); > > + continue; > > + } > > + > > + sk = &tsk->sk; > > + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { > > + __skb_unlink(skb, wakeupq); > > + if (!sock_owned_by_user(sk)) { > > + tipc_dest_del(&tsk->cong_links, > > + msg_orignode(buf_msg(skb)), > 0); > > + /* coupled with smp_rmb() in > > + * tipc_wait_for_cond() > > + */ > > + smp_wmb(); > > + tsk->cong_link_cnt--; > > + sk->sk_write_space(sk); > > + kfree_skb(skb); > > + } else if (unlikely(!tipc_sk_add_backlog(sk, skb))) > { > > + kfree_skb(skb); > > + pr_warn("Drop wakeup message for port %u\n", > > + tipc_sk(sk)->portid); > > + } > > + spin_unlock_bh(&sk->sk_lock.slock); > > + } > > + sock_put(sk); > > + } > > + goto start; > > +} > > + > > static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) { > > DEFINE_WAIT_FUNC(wait, woken_wake_function); diff --git > > a/net/tipc/socket.h b/net/tipc/socket.h index > > 235b9679acee..44eb9fb68d7e > > 100644 > > --- a/net/tipc/socket.h > > +++ b/net/tipc/socket.h > > @@ -54,6 +54,7 @@ struct tipc_sock; > > int tipc_socket_init(void); > > void tipc_socket_stop(void); > > void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); > > +void tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head > > +*wakeupq); > > void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, > > struct sk_buff_head *inputq); void tipc_sk_reinit(struct > > net *net); > > -- > > 2.17.1 > |
From: tung q. n. <tun...@de...> - 2019-07-18 09:45:25
|
Hi Jon, "Wakeup messages are not time critical, so this is not a good reason to bypass the tipc_sk_rcv() function. It is an important principle to only have one single entry point in the socket for arriving messages, and we should stick to that." [Tung]: There is good reasons to let wakeup messages bypass tipc_sk_rcv(): - It is not necessary to grab/release lock before/after dequeuing a wakeup messages. This creates overhead. - A large number of wakeup messages and data messages creates more overhead for destination port filter function. By consuming wakeup messages without grabbing queue lock, the overhead for consuming data & wakeup messages is reduced. I agree with you that we should have a single entry point for arriving messages. We can use the same function tipc_sk_rcv() and it will call tipc_sk_rcv_inputq() and tipc_sk_rcv_wakeupq(). " Adding a new queue on the stack and then pass it along everywhere is very intrusive, introduces a lot of extra code and complexity, and adds performance overhead to the critical data path. This is not necessary to achieve what you want." [Tung]: It introduces extra code but I do not think it is more complex. And there is no more performance overhead (I will show it in another mail). Actually, to achieve that (fix starvation issue), I believe we have to do 2 things as I did and the test result was good. Best regards, Tung Nguyen -----Original Message----- From: Jon Maloy <jon...@er...> Sent: Wednesday, July 17, 2019 10:47 PM To: Tung Quang Nguyen <tun...@de...>; tip...@li...; ma...@do...; yin...@wi... Subject: RE: [tipc-discussion][PATCH v1 1/1] tipc: fix starvation of sending sockets when handling wakeup messages Hi Tung, Your analysis seems correct, but your patch is too intrusive, and can be made much simpler. See below. > -----Original Message----- > From: Tung Nguyen <tun...@de...> > Sent: 9-Jul-19 22:39 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [tipc-discussion][PATCH v1 1/1] tipc: fix starvation of sending sockets > when handling wakeup messages > > Commit 365ad35 ("tipc: reduce risk of user starvation during link congestion") > aimed to allow senders to add their lists of socket buffers to the link backlog > queues under link congestion. However, when dequeuing the link wakeup > queue by link_prepare_wakeup(), there might be a worst case scenario: > - More than 10 wakeup messages near the wakeup queue head are not > dequeued because the backlog queue length is still greater than the limit. > - Many wakeup messages near the wakeup queue tail are not dequeued > though their importance is different from the one of 10 wakeup messages > near the queue head and the backlog queue length is less than the limit. > > Furthermore, function tipc_sk_rcv() consumes both normal data messages > and wakeup messages from the link input queue. By allowing > oversubscriptions, the number of messages passed through tipc_sk_rcv() > would be higher. > Applying destination port filter to wakeup messages via tipc_skb_peek_port() > is not necessary and even causes more overhead. Wakeup messages are not time critical, so this is not a good reason to bypass the tipc_sk_rcv() function. It is an important principle to only have one single entry point in the socket for arriving messages, and we should stick to that. > > As a result, some non-blocking socket senders are not able to send data > because epoll() takes many seconds to return EPOLLOUT. > > This commit fixes this issues by: > - Allowing to dequeue as many wakeup messages as possible. That is good. Just removing the 10-skb limit would probably be sufficient to resolve the problem. > - Separating wakeup messages from the link input queue. All wakeup > messages are now put in a local queue and consumed in a simple way. There is no reason to do this. Instead, you should just take the messages in the wakeup queue, sort them by importance order into a temporary queue inside link_prepare_wakeup(), and add that queue to the head of the input queue, instead of to the tail. That is all you need to do, in addition to removing the 10-buffer limit. Adding a new queue on the stack and then pass it along everywhere is very intrusive, introduces a lot of extra code and complexity, and adds performance overhead to the critical data path. This is not necessary to achieve what you want. BR ///jon > > Fixes: 365ad35 ("tipc: reduce risk of user starvation during link congestion") > Signed-off-by: Tung Nguyen <tun...@de...> > --- > net/tipc/bcast.c | 42 +++++++++++----------- > net/tipc/link.c | 67 +++++++++++++++++----------------- > net/tipc/link.h | 12 ++++--- > net/tipc/node.c | 28 +++++++++++---- > net/tipc/socket.c | 91 +++++++++++++++++++++++++++++++++++++++++-- > ---- > net/tipc/socket.h | 1 + > 6 files changed, 166 insertions(+), 75 deletions(-) > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > 6c997d4a6218..4de0f9780ef5 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -421,11 +421,11 @@ int tipc_mcast_xmit(struct net *net, struct > sk_buff_head *pkts, int tipc_bcast_rcv(struct net *net, struct tipc_link *l, > struct sk_buff *skb) { > struct tipc_msg *hdr = buf_msg(skb); > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > int rc; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { > kfree_skb(skb); > @@ -434,16 +434,16 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link > *l, struct sk_buff *skb) > > tipc_bcast_lock(net); > if (msg_user(hdr) == BCAST_PROTOCOL) > - rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); > + rc = tipc_link_bc_nack_rcv(l, skb, &xmitq, &wakeupq); > else > - rc = tipc_link_rcv(l, skb, NULL); > + rc = tipc_link_rcv(l, skb, NULL, &wakeupq); > tipc_bcast_unlock(net); > > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > > return rc; > } > @@ -455,25 +455,25 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link > *l, struct sk_buff *skb) void tipc_bcast_ack_rcv(struct net *net, struct > tipc_link *l, > struct tipc_msg *hdr) > { > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > u16 acked = msg_bcast_ack(hdr); > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > > /* Ignore bc acks sent by peer before bcast synch point was received */ > if (msg_bc_ack_invalid(hdr)) > return; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_bcast_lock(net); > - tipc_link_bc_ack_rcv(l, acked, &xmitq); > + tipc_link_bc_ack_rcv(l, acked, &xmitq, &wakeupq); > tipc_bcast_unlock(net); > > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > } > > /* tipc_bcast_synch_rcv - check and update rcv link with peer's send state > @@ -483,17 +483,17 @@ void tipc_bcast_ack_rcv(struct net *net, struct > tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, > struct tipc_msg *hdr) > { > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > int rc = 0; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_bcast_lock(net); > if (msg_type(hdr) != STATE_MSG) { > tipc_link_bc_init_rcv(l, hdr); > } else if (!msg_bc_ack_invalid(hdr)) { > - tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); > + tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq, > &wakeupq); > rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq); > } > tipc_bcast_unlock(net); > @@ -501,8 +501,8 @@ int tipc_bcast_sync_rcv(struct net *net, struct > tipc_link *l, > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > return rc; > } > > @@ -529,13 +529,13 @@ void tipc_bcast_add_peer(struct net *net, struct > tipc_link *uc_l, void tipc_bcast_remove_peer(struct net *net, struct tipc_link > *rcv_l) { > struct tipc_link *snd_l = tipc_bc_sndlink(net); > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_bcast_lock(net); > - tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); > + tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq, &wakeupq); > tipc_bcbase_select_primary(net); > tipc_bcbase_calc_bc_threshold(net); > tipc_bcast_unlock(net); > @@ -543,8 +543,8 @@ void tipc_bcast_remove_peer(struct net *net, struct > tipc_link *rcv_l) > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > } > > int tipc_bclink_reset_stats(struct net *net) diff --git a/net/tipc/link.c > b/net/tipc/link.c index 2050fd386642..e67d7e6a793b 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -237,7 +237,8 @@ static int link_is_up(struct tipc_link *l) } > > static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool > probe, > bool probe_reply, u16 rcvgap, > int tolerance, int priority, > @@ -331,6 +332,11 @@ struct sk_buff_head *tipc_link_inputq(struct > tipc_link *l) > return l->inputq; > } > > +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l) { > + return &l->wakeupq; > +} > + > char tipc_link_plane(struct tipc_link *l) { > return l->net_plane; > @@ -355,19 +361,21 @@ void tipc_link_add_bc_peer(struct tipc_link > *snd_l, > > void tipc_link_remove_bc_peer(struct tipc_link *snd_l, > struct tipc_link *rcv_l, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > u16 ack = snd_l->snd_nxt - 1; > > snd_l->ackers--; > rcv_l->bc_peer_is_up = true; > rcv_l->state = LINK_ESTABLISHED; > - tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); > + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq, wakeupq); > trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); > tipc_link_reset(rcv_l); > rcv_l->state = LINK_RESET; > if (!snd_l->ackers) { > trace_tipc_link_reset(snd_l, TIPC_DUMP_ALL, "zero ackers!"); > + skb_queue_splice_tail_init(&snd_l->wakeupq, wakeupq); > tipc_link_reset(snd_l); > snd_l->state = LINK_RESET; > __skb_queue_purge(xmitq); > @@ -500,7 +508,7 @@ bool tipc_link_create(struct net *net, char *if_name, > int bearer_id, > __skb_queue_head_init(&l->backlogq); > __skb_queue_head_init(&l->deferdq); > __skb_queue_head_init(&l->failover_deferdq); > - skb_queue_head_init(&l->wakeupq); > + __skb_queue_head_init(&l->wakeupq); > skb_queue_head_init(l->inputq); > return true; > } > @@ -839,9 +847,8 @@ static int link_schedule_user(struct tipc_link *l, struct > tipc_msg *hdr) > dnode, l->addr, dport, 0, 0); > if (!skb) > return -ENOBUFS; > - msg_set_dest_droppable(buf_msg(skb), true); > TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); > - skb_queue_tail(&l->wakeupq, skb); > + __skb_queue_tail(&l->wakeupq, skb); > l->stats.link_congs++; > trace_tipc_link_conges(l, TIPC_DUMP_ALL, "wakeup scheduled!"); > return -ELINKCONG; > @@ -853,46 +860,34 @@ static int link_schedule_user(struct tipc_link *l, > struct tipc_msg *hdr) > * Wake up a number of waiting users, as permitted by available space > * in the send queue > */ > -static void link_prepare_wakeup(struct tipc_link *l) > +static void link_prepare_wakeup(struct tipc_link *l, > + struct sk_buff_head *wakeupq) > { > struct sk_buff *skb, *tmp; > - int imp, i = 0; > + int imp; > > skb_queue_walk_safe(&l->wakeupq, skb, tmp) { > imp = TIPC_SKB_CB(skb)->chain_imp; > if (l->backlog[imp].len < l->backlog[imp].limit) { > - skb_unlink(skb, &l->wakeupq); > - skb_queue_tail(l->inputq, skb); > - } else if (i++ > 10) { > - break; > + __skb_unlink(skb, &l->wakeupq); > + __skb_queue_tail(wakeupq, skb); > } > } > } > > void tipc_link_reset(struct tipc_link *l) { > - struct sk_buff_head list; > - > - __skb_queue_head_init(&list); > - > l->in_session = false; > /* Force re-synch of peer session number before establishing */ > l->peer_session--; > l->session++; > l->mtu = l->advertised_mtu; > > - spin_lock_bh(&l->wakeupq.lock); > - skb_queue_splice_init(&l->wakeupq, &list); > - spin_unlock_bh(&l->wakeupq.lock); > - > - spin_lock_bh(&l->inputq->lock); > - skb_queue_splice_init(&list, l->inputq); > - spin_unlock_bh(&l->inputq->lock); > - > __skb_queue_purge(&l->transmq); > __skb_queue_purge(&l->deferdq); > __skb_queue_purge(&l->backlogq); > __skb_queue_purge(&l->failover_deferdq); > + __skb_queue_purge(&l->wakeupq); > l->backlog[TIPC_LOW_IMPORTANCE].len = 0; > l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; > l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; @@ -1445,9 +1440,11 > @@ static int tipc_link_build_nack_msg(struct tipc_link *l, > * @l: the link that should handle the message > * @skb: TIPC packet > * @xmitq: queue to place packets to be sent after this call > + * @wakeupq: list of wakeup messages to be consumed after this call > */ > int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct sk_buff_head *defq = &l->deferdq; > struct tipc_msg *hdr = buf_msg(skb); > @@ -1456,7 +1453,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff > *skb, > > /* Verify and update link state */ > if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) > - return tipc_link_proto_rcv(l, skb, xmitq); > + return tipc_link_proto_rcv(l, skb, xmitq, wakeupq); > > /* Don't send probe at next timeout expiration */ > l->silent_intv_cnt = 0; > @@ -1484,7 +1481,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff > *skb, > l->stale_cnt = 0; > tipc_link_advance_backlog(l, xmitq); > if (unlikely(!skb_queue_empty(&l->wakeupq))) > - link_prepare_wakeup(l); > + link_prepare_wakeup(l, wakeupq); > } > > /* Defer delivery if sequence gap */ > @@ -1518,6 +1515,7 @@ static void tipc_link_build_proto_msg(struct > tipc_link *l, int mtyp, bool probe, > bool probe_reply, u16 rcvgap, > int tolerance, int priority, > struct sk_buff_head *xmitq) > + > { > struct tipc_link *bcl = l->bc_rcvlink; > struct sk_buff *skb; > @@ -1786,7 +1784,8 @@ bool tipc_link_validate_msg(struct tipc_link *l, > struct tipc_msg *hdr) > * network plane > */ > static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct tipc_msg *hdr = buf_msg(skb); > struct tipc_gap_ack_blks *ga = NULL; > @@ -1926,7 +1925,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, > struct sk_buff *skb, > > tipc_link_advance_backlog(l, xmitq); > if (unlikely(!skb_queue_empty(&l->wakeupq))) > - link_prepare_wakeup(l); > + link_prepare_wakeup(l, wakeupq); > } > exit: > kfree_skb(skb); > @@ -2072,7 +2071,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct > tipc_msg *hdr, } > > void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct sk_buff *skb, *tmp; > struct tipc_link *snd_l = l->bc_sndlink; @@ -2102,7 +2102,7 @@ void > tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > l->acked = acked; > tipc_link_advance_backlog(snd_l, xmitq); > if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) > - link_prepare_wakeup(snd_l); > + link_prepare_wakeup(snd_l, wakeupq); > } > > /* tipc_link_bc_nack_rcv(): receive broadcast nack message @@ -2110,7 > +2110,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5. > */ > int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct tipc_msg *hdr = buf_msg(skb); > u32 dnode = msg_destnode(hdr); > @@ -2130,7 +2131,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct > sk_buff *skb, > return 0; > > if (dnode == tipc_own_addr(l->net)) { > - tipc_link_bc_ack_rcv(l, acked, xmitq); > + tipc_link_bc_ack_rcv(l, acked, xmitq, wakeupq); > rc = tipc_link_retrans(l->bc_sndlink, l, from, to, xmitq); > l->stats.recv_nacks++; > return rc; > diff --git a/net/tipc/link.h b/net/tipc/link.h index > adcad65e761c..9568bd965945 100644 > --- a/net/tipc/link.h > +++ b/net/tipc/link.h > @@ -107,6 +107,7 @@ void tipc_link_reset_stats(struct tipc_link *l); int > tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, > struct sk_buff_head *xmitq); > struct sk_buff_head *tipc_link_inputq(struct tipc_link *l); > +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l); > u16 tipc_link_rcv_nxt(struct tipc_link *l); > u16 tipc_link_acked(struct tipc_link *l); > u32 tipc_link_id(struct tipc_link *l); > @@ -130,25 +131,28 @@ int __tipc_nl_add_link(struct net *net, struct > tipc_nl_msg *msg, int tipc_nl_parse_link_prop(struct nlattr *prop, struct > nlattr *props[]); int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head > *xmitq); int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, struct sk_buff_head *wakeupq); > int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq); > void tipc_link_add_bc_peer(struct tipc_link *snd_l, > struct tipc_link *uc_l, > struct sk_buff_head *xmitq); > void tipc_link_remove_bc_peer(struct tipc_link *snd_l, > struct tipc_link *rcv_l, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > int tipc_link_bc_peers(struct tipc_link *l); void tipc_link_set_mtu(struct > tipc_link *l, int mtu); int tipc_link_mtu(struct tipc_link *l); void > tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > void tipc_link_build_bc_sync_msg(struct tipc_link *l, > struct sk_buff_head *xmitq); > void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); > int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, > struct sk_buff_head *xmitq); > int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > bool tipc_link_too_silent(struct tipc_link *l); #endif diff --git > a/net/tipc/node.c b/net/tipc/node.c index 550581d47d51..d0ec29081b11 > 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -154,6 +154,7 @@ enum { > > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq, > struct tipc_media_addr **maddr); > static void tipc_node_link_down(struct tipc_node *n, int bearer_id, > bool delete); > @@ -803,6 +804,7 @@ static void tipc_node_link_failover(struct tipc_node > *n, struct tipc_link *l, > */ > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq, > struct tipc_media_addr **maddr) > { > struct tipc_link_entry *le = &n->links[*bearer_id]; @@ -851,6 +853,7 > @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); > trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down!"); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); > tipc_link_reset(l); > tipc_link_build_reset_msg(l, xmitq); > *maddr = &n->links[*bearer_id].maddr; @@ -868,6 +871,7 @@ > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1); > tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); > trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down -> failover!"); > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); > tipc_link_reset(l); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); @@ -881,18 > +885,20 @@ static void tipc_node_link_down(struct tipc_node *n, int > bearer_id, bool delete) > struct tipc_media_addr *maddr = NULL; > struct tipc_link *l = le->link; > int old_bearer_id = bearer_id; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > > if (!l) > return; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_node_write_lock(n); > if (!tipc_link_is_establishing(l)) { > - __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); > + __tipc_node_link_down(n, &bearer_id, &xmitq, &wakeupq, > &maddr); > } else { > /* Defuse pending tipc_node_link_up() */ > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), &wakeupq); > tipc_link_reset(l); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > } > @@ -908,6 +914,7 @@ static void tipc_node_link_down(struct tipc_node > *n, int bearer_id, bool delete) > if (!skb_queue_empty(&xmitq)) > tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); > tipc_sk_rcv(n->net, &le->inputq); > + tipc_sk_rcv_wakeup(n->net, &wakeupq); > } > > static bool node_is_up(struct tipc_node *n) @@ -1652,6 +1659,7 @@ static > bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, > u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); > u16 exp_pkts = msg_msgcnt(hdr); > u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; > + struct sk_buff_head wakeupq; > int state = n->state; > struct tipc_link *l, *tnl, *pl = NULL; > struct tipc_media_addr *maddr; > @@ -1711,9 +1719,12 @@ static bool tipc_node_check_state(struct > tipc_node *n, struct sk_buff *skb, > if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { > syncpt = oseqno + exp_pkts - 1; > if (pl && !tipc_link_is_reset(pl)) { > - __tipc_node_link_down(n, &pb_id, xmitq, &maddr); > + __skb_queue_head_init(&wakeupq); > + __tipc_node_link_down(n, &pb_id, xmitq, > + &wakeupq, &maddr); > trace_tipc_node_link_down(n, true, > "node link down <- failover!"); > + skb_queue_splice_tail(&wakeupq, tipc_link_wakeupq(l)); > tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl), > tipc_link_inputq(l)); > } > @@ -1795,7 +1806,7 @@ static bool tipc_node_check_state(struct > tipc_node *n, struct sk_buff *skb, > */ > void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) { > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > struct tipc_node *n; > struct tipc_msg *hdr; > int bearer_id = b->identity; > @@ -1805,6 +1816,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > u16 bc_ack; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > /* Ensure message is well-formed before touching the header */ > if (unlikely(!tipc_msg_validate(&skb))) > @@ -1842,7 +1854,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > if (likely((n->state == SELF_UP_PEER_UP) && (usr != > TUNNEL_PROTOCOL))) { > spin_lock_bh(&le->lock); > if (le->link) { > - rc = tipc_link_rcv(le->link, skb, &xmitq); > + rc = tipc_link_rcv(le->link, skb, &xmitq, &wakeupq); > skb = NULL; > } > spin_unlock_bh(&le->lock); > @@ -1856,7 +1868,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > tipc_node_write_lock(n); > if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) { > if (le->link) { > - rc = tipc_link_rcv(le->link, skb, &xmitq); > + rc = tipc_link_rcv(le->link, skb, > + &xmitq, &wakeupq); > skb = NULL; > } > } > @@ -1878,6 +1891,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > if (!skb_queue_empty(&le->inputq)) > tipc_sk_rcv(net, &le->inputq); > > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > + > if (!skb_queue_empty(&xmitq)) > tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); > > diff --git a/net/tipc/socket.c b/net/tipc/socket.c index > dd8537f988c4..fa48fb14add7 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -2224,6 +2224,33 @@ static int tipc_sk_backlog_rcv(struct sock *sk, > struct sk_buff *skb) > return 0; > } > > +/** > + * tipc_sk_add_backlog - add the skb to the socket backlog queue > + * @sk: socket where the skb should be enqueued > + * @skb: the skb being enqueued > + * > + * Return true if the skb was added. Otherwise, return false */ static > +bool tipc_sk_add_backlog(struct sock *sk, struct sk_buff *skb) { > + unsigned int lim; > + atomic_t *dcnt; > + > + /* Try backlog, compensating for double-counted bytes */ > + dcnt = &tipc_sk(sk)->dupl_rcvcnt; > + if (!sk->sk_backlog.len) > + atomic_set(dcnt, 0); > + > + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); > + if (likely(!sk_add_backlog(sk, skb, lim))) { > + trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, > + "bklg & rcvq >90% allocated!"); > + return true; > + } > + > + return false; > +} > + > /** > * tipc_sk_enqueue - extract all buffers with destination 'dport' from > * inputq and try adding them to socket or backlog queue > @@ -2238,8 +2265,6 @@ static void tipc_sk_enqueue(struct sk_buff_head > *inputq, struct sock *sk, { > unsigned long time_limit = jiffies + 2; > struct sk_buff *skb; > - unsigned int lim; > - atomic_t *dcnt; > u32 onode; > > while (skb_queue_len(inputq)) { > @@ -2256,16 +2281,9 @@ static void tipc_sk_enqueue(struct sk_buff_head > *inputq, struct sock *sk, > continue; > } > > - /* Try backlog, compensating for double-counted bytes */ > - dcnt = &tipc_sk(sk)->dupl_rcvcnt; > - if (!sk->sk_backlog.len) > - atomic_set(dcnt, 0); > - lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); > - if (likely(!sk_add_backlog(sk, skb, lim))) { > - trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, > - "bklg & rcvq >90% allocated!"); > + /* Add the skb to the socket backlog queue */ > + if (tipc_sk_add_backlog(sk, skb)) > continue; > - } > > trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!"); > /* Overload => reject message back to sender */ @@ -2332,6 > +2350,57 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) > } > } > > +/** > + * tipc_sk_rcv_wakeup - handle a chain of wakeup messages > + * @wakeupq: list of wakeup messages > + * Consumes all messages in the list until it is empty */ void > +tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq) { > + u32 dport = 0; > + struct tipc_sock *tsk; > + struct sock *sk; > + struct sk_buff *skb, *tmp; > + > +start: > + if (!skb_queue_len(wakeupq)) > + return; > + > + skb_queue_walk_safe(wakeupq, skb, tmp) { > + dport = msg_destport(buf_msg(skb)); > + tsk = tipc_sk_lookup(net, dport); > + > + if (!unlikely(tsk)) { > + __skb_unlink(skb, wakeupq); > + kfree_skb(skb); > + continue; > + } > + > + sk = &tsk->sk; > + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { > + __skb_unlink(skb, wakeupq); > + if (!sock_owned_by_user(sk)) { > + tipc_dest_del(&tsk->cong_links, > + msg_orignode(buf_msg(skb)), 0); > + /* coupled with smp_rmb() in > + * tipc_wait_for_cond() > + */ > + smp_wmb(); > + tsk->cong_link_cnt--; > + sk->sk_write_space(sk); > + kfree_skb(skb); > + } else if (unlikely(!tipc_sk_add_backlog(sk, skb))) { > + kfree_skb(skb); > + pr_warn("Drop wakeup message for port %u\n", > + tipc_sk(sk)->portid); > + } > + spin_unlock_bh(&sk->sk_lock.slock); > + } > + sock_put(sk); > + } > + goto start; > +} > + > static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) { > DEFINE_WAIT_FUNC(wait, woken_wake_function); diff --git > a/net/tipc/socket.h b/net/tipc/socket.h index 235b9679acee..44eb9fb68d7e > 100644 > --- a/net/tipc/socket.h > +++ b/net/tipc/socket.h > @@ -54,6 +54,7 @@ struct tipc_sock; > int tipc_socket_init(void); > void tipc_socket_stop(void); > void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); > +void tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq); > void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, > struct sk_buff_head *inputq); > void tipc_sk_reinit(struct net *net); > -- > 2.17.1 |
From: Mahesh K. <lm...@ya...> - 2019-07-17 23:47:59
|
Hi Jon, Resending the attachment as inline email content; as I am not sure the attachment got posted to the community successfully.>> Sure.Thanks for the insight on the reserved service type. I will change the service type to 100. Sorry, it took a while to get the correct 'tipc' from the SDK team. 4.4.180 kernel would be one that we are planning to ship next month. However, 4.19.44 or later would be part of the next release. Please find the attached doc. it has request output from host and container. >>>>> Host: +++++++ [Switch_1_RP_0:/mnt/sd3/user]$ ./tipc link list broadcast-link: up [Switch_1_RP_0:/mnt/sd3/user]$ ip address show dev ieobc_br 14: ieobc_br: <BROADCAST,MULTICAST,PROMISC,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000 link/ether fe:0e:00:0b:0c:00 brd ff:ff:ff:ff:ff:ff [Switch_1_RP_0:/mnt/sd3/user]$ ./tipc bearer list eth:ieobc_br [Switch_1_RP_0:/mnt/sd3/user]$ ./tipc nametable show Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 zone 1 1 1 <1.1.1:589961650> 589961651 node 1 88 88 <1.1.1:252975192> 252975193 zone ##MK; I will change to 100 as I know understand that 0-64 is reserved. Thank you. 15003 5 5 <1.1.1:845517643> 845517644 zone [Switch_1_RP_0:/mnt/sd3/user]$ uname -a Linux Switch_1_RP_0 4.4.180 #1 SMP Tue Jun 25 15:36:10 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux Container ++++++++ / # ip addr 64: ieobc@if65: <BROADCAST,MULTICAST> mtu 1500 qdisc noop state DOWN group default qlen 1000 link/ether 52:54:dd:fe:dc:4c brd ff:ff:ff:ff:ff:ff link-netnsid 0 / # / # tipc bearer list eth:ieobc / # / # tipc link list broadcast-link: up / # / # tipc nametable show Type Lower Upper Port Identity Publication Scope 0 16781412 16781412 <1.1.100:0> 16781412 zone 1 1 1 <1.1.100:3117258882> 3117258883 node ===== <<<<< The host actually used tipc-config tool to set up the TIPC. Apart from the requested output, I like to highlight a few observations. with the old kernel: (4.4.155) in host: ; tipc-config yields the following output [iox-cat9k-146_3_RP_0:/]$ tipc-config -b Bearers: eth:ieobc_br [iox-cat9k-146_3_RP_0:/]$ tipc-config -addr node address: <1.1.1> [iox-cat9k-146_3_RP_0:/]$ tipc-config -netid current network id: 4711 with new kernel(4.4.180) in host; we get iox-cat9k-146_3_RP_0:/]$ tipc-config -s TIPC version 2.0.0 [iox-cat9k-146_3_RP_0:/]$ tipc-config -b Bearers: operation not supported [iox-cat9k-146_3_RP_0:/]$ tipc-config -addr operation not supported [iox-cat9k-146_3_RP_0:/]$ tipc-config -netid operation not supported do we suspect any kernel patch that can cause this behavioral change ?.if so, could you please point me. thanks & regardsMahesh kumar.L On Tuesday, 16 July, 2019, 05:24:53 am GMT-7, Jon Maloy <jon...@er...> wrote: Hi Kumar, First of all, I would recommend you not to use <1.88.88> as listener address, since the service types [0,64] are reserved for internal use by TIPC itself. I am also a little confused about which kernel you are using. Are you now using 4.19.44, and not 4.4.180, or was this just a test? BTW, since 4.17, the use of “zone” scope is deprecated, and translated internally to “cluster”. There was never any functional difference between them anyway. Regarding your system, a good start would be to issue the following commands, both on the host and in the container: $ ip addr $ tipc bearer list $ tipc link list $ tipc nametable show BR ///jon From: Mahesh Kumar <lm...@ya...> Sent: 15-Jul-19 16:21 To: Ying Xue <yin...@wi...>; Jon Maloy <jon...@er...> Cc: tip...@li... Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request Hi Jon, Thanks a lot for checking this and providing feedback. A brief background of the system. In the host system, upon bootup node address 1.1.1 would be configured. I added is a listener in 1.88.88. tams_srv_addr.family = AF_TIPC; tams_srv_addr.addrtype = TIPC_ADDR_NAMESEQ; tams_srv_addr.addr.nameseq.type =TIPC_TOP_SRV; tams_srv_addr.addr.nameseq.lower = TAMS_TIPC_LISTEN_PORT; <<88 tams_srv_addr.addr.nameseq.upper = TAMS_TIPC_LISTEN_PORT; tams_srv_addr.scope = TIPC_ZONE_SCOPE; Now from a container, I am trying to access the host service (88); by setting its container's node address as 1.1.100 using "tipc node set addr 1.1.100". This used to work fine, but it bails out with 4.4.180 kernel. Another change I noticed by changing my kernel to 4.19.44 is that all of the listeners are in cluster scope now instead of zone scope. I didn't had a chance to check the results from container with the new kernel. >>from host >>>tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 cluster 1 1 1 <1.1.1:2399481494> 2399481495 node 1 88 88 <1.1.1:2536304640> 2536304641 cluster 15003 5 5 <1.1.1:3973838764> 3973838765 cluster [Switch_1_RP_0:/]$ uname -a Linux Switch_1_RP_0 4.19.44 #1 SMP Wed May 22 13:50:02 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux Please let me know; which outputs should I need to share from the 'host' and 'container' side. thanks & regards Mahesh kumar.L On Monday, 15 July, 2019, 12:36:09 pm GMT-7, Jon Maloy <jon...@er...> wrote: Hi Kumar, Your binding table listing reveals that your node already has an address <1.1.1>, which explains why your address setting fails. You should check if you have any script that sets the address by default at module loading, or maybe you just set it manually and then forgot... Furthermore, its seems you have published a service <1,88,88> which means you are illegally using the reserved service type <1>. The latter isn't your fault, but due to a bug in TIPC that wrongly allows users to publish such service types, in the function tipc_bind(). I discovered this ug a couple of months ago, but haven't fixed it yet, and I am not quite sure how to do it without breaking any BPI. This may cause you surprises, but I cannot see why it would cause the bearer enabling to fail. If this problem persists, you should post some more system info about your interfaces, which tipc links you have etc. BR ///jon > -----Original Message----- > From: Mahesh Kumar via tipc-discussion <tipc- > dis...@li...> > Sent: 15-Jul-19 11:49 > To: tip...@li...; Ying Xue > <yin...@wi...> > Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request > > Hi Ying, > Thank you very much for letting me know.Do we suspect any related ioctl() > patches?. could you please point me to link where we can review the TIPC > patches that went in the kernel.?. > Much appreciate the help. > thanks & regardsMahesh kumar.L > On Monday, 15 July, 2019, 02:56:32 am GMT-7, Ying Xue > <yin...@wi...> wrote: > > On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > > Tipc Team, > > > > Greetings!. > > I have been using TIPC for about a year without any issueHowever the > > TIPC tool is bailing out when I tried to set address, bearer > > > > > > / # tipc node set addr 1.1.100 > > > > error: Operation not permitted > > > > / # tipc bearer enable media eth dev ieobc > > > > error: Invalid argument > > > > / # > > > > I am using the new kernel now; > > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 > 15:36:10 > > PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated > > (version 2.0.0)[ 29.436768] tipc: Started in single node mode > > Suspected some TIPC patches integrated through 4.4.180 release introduced > regression. The most simplest method to identify the issue is to revert some > TIPC patches to identify which ones caused the regression. > > > > > [2c3f0b001900_1_RP_0:/] $ tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 zone 1 1 1 <1.1.1:483390874> 483390875 node 1 88 88 <1.1.1:2870943326> 2870943327 zone 15003 5 5 <1.1.1:3556781096> 3556781097 zone [2c3f0b001900_1_RP_0:/]$ > > > > please let me know if any issue. > > thanks & regardsMahesh kumar.L > > > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li... > > TIPC Cluster Domain Sockets Cluster wide IPC providing datagram, connection, and bus messaging Brought to you by: > > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion _______________________________________________ tipc-discussion mailing list tip...@li... https://lists.sourceforge.net/lists/listinfo/tipc-discussion | | | | TIPC Cluster Domain Sockets Cluster wide I... Cluster wide IPC providing datagram, connection, and bus messaging | | | |
From: Mahesh K. <lm...@ya...> - 2019-07-17 23:41:35
|
Hi Jon, Sure.Thanks for the insight on the reserved service type. I will change the service type to 100. Sorry, it took a while to get the correct 'tipc' from the SDK team. 4.4.180 kernel would be one that we are planning to ship next month. However, 4.19.44 or later would be part of the next release. Please find the attached doc. it has request output from host and container.The host actually used tipc-config tool to set up the TIPC. Apart from the requested output, I like to highlight a few observations. with the old kernel: (4.4.155) in host: ; tipc-config yields the following output [iox-cat9k-146_3_RP_0:/]$ tipc-config -b Bearers: eth:ieobc_br [iox-cat9k-146_3_RP_0:/]$ tipc-config -addr node address: <1.1.1> [iox-cat9k-146_3_RP_0:/]$ tipc-config -netid current network id: 4711 with new kernel(4.4.180) in host; we get iox-cat9k-146_3_RP_0:/]$ tipc-config -s TIPC version 2.0.0 [iox-cat9k-146_3_RP_0:/]$ tipc-config -b Bearers: operation not supported [iox-cat9k-146_3_RP_0:/]$ tipc-config -addr operation not supported [iox-cat9k-146_3_RP_0:/]$ tipc-config -netid operation not supported do we suspect any kernel patch that can cause this behavioral change ?.if so, could you please point me. thanks & regardsMahesh kumar.L On Tuesday, 16 July, 2019, 05:24:53 am GMT-7, Jon Maloy <jon...@er...> wrote: Hi Kumar, First of all, I would recommend you not to use <1.88.88> as listener address, since the service types [0,64] are reserved for internal use by TIPC itself. I am also a little confused about which kernel you are using. Are you now using 4.19.44, and not 4.4.180, or was this just a test? BTW, since 4.17, the use of “zone” scope is deprecated, and translated internally to “cluster”. There was never any functional difference between them anyway. Regarding your system, a good start would be to issue the following commands, both on the host and in the container: $ ip addr $ tipc bearer list $ tipc link list $ tipc nametable show BR ///jon From: Mahesh Kumar <lm...@ya...> Sent: 15-Jul-19 16:21 To: Ying Xue <yin...@wi...>; Jon Maloy <jon...@er...> Cc: tip...@li... Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request Hi Jon, Thanks a lot for checking this and providing feedback. A brief background of the system. In the host system, upon bootup node address 1.1.1 would be configured. I added is a listener in 1.88.88. tams_srv_addr.family = AF_TIPC; tams_srv_addr.addrtype = TIPC_ADDR_NAMESEQ; tams_srv_addr.addr.nameseq.type =TIPC_TOP_SRV; tams_srv_addr.addr.nameseq.lower = TAMS_TIPC_LISTEN_PORT; <<88 tams_srv_addr.addr.nameseq.upper = TAMS_TIPC_LISTEN_PORT; tams_srv_addr.scope = TIPC_ZONE_SCOPE; Now from a container, I am trying to access the host service (88); by setting its container's node address as 1.1.100 using "tipc node set addr 1.1.100". This used to work fine, but it bails out with 4.4.180 kernel. Another change I noticed by changing my kernel to 4.19.44 is that all of the listeners are in cluster scope now instead of zone scope. I didn't had a chance to check the results from container with the new kernel. >>from host >>>tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 cluster 1 1 1 <1.1.1:2399481494> 2399481495 node 1 88 88 <1.1.1:2536304640> 2536304641 cluster 15003 5 5 <1.1.1:3973838764> 3973838765 cluster [Switch_1_RP_0:/]$ uname -a Linux Switch_1_RP_0 4.19.44 #1 SMP Wed May 22 13:50:02 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux Please let me know; which outputs should I need to share from the 'host' and 'container' side. thanks & regards Mahesh kumar.L On Monday, 15 July, 2019, 12:36:09 pm GMT-7, Jon Maloy <jon...@er...> wrote: Hi Kumar, Your binding table listing reveals that your node already has an address <1.1.1>, which explains why your address setting fails. You should check if you have any script that sets the address by default at module loading, or maybe you just set it manually and then forgot... Furthermore, its seems you have published a service <1,88,88> which means you are illegally using the reserved service type <1>. The latter isn't your fault, but due to a bug in TIPC that wrongly allows users to publish such service types, in the function tipc_bind(). I discovered this ug a couple of months ago, but haven't fixed it yet, and I am not quite sure how to do it without breaking any BPI. This may cause you surprises, but I cannot see why it would cause the bearer enabling to fail. If this problem persists, you should post some more system info about your interfaces, which tipc links you have etc. BR ///jon > -----Original Message----- > From: Mahesh Kumar via tipc-discussion <tipc- > dis...@li...> > Sent: 15-Jul-19 11:49 > To: tip...@li...; Ying Xue > <yin...@wi...> > Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request > > Hi Ying, > Thank you very much for letting me know.Do we suspect any related ioctl() > patches?. could you please point me to link where we can review the TIPC > patches that went in the kernel.?. > Much appreciate the help. > thanks & regardsMahesh kumar.L > On Monday, 15 July, 2019, 02:56:32 am GMT-7, Ying Xue > <yin...@wi...> wrote: > > On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > > Tipc Team, > > > > Greetings!. > > I have been using TIPC for about a year without any issueHowever the > > TIPC tool is bailing out when I tried to set address, bearer > > > > > > / # tipc node set addr 1.1.100 > > > > error: Operation not permitted > > > > / # tipc bearer enable media eth dev ieobc > > > > error: Invalid argument > > > > / # > > > > I am using the new kernel now; > > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 > 15:36:10 > > PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated > > (version 2.0.0)[ 29.436768] tipc: Started in single node mode > > Suspected some TIPC patches integrated through 4.4.180 release introduced > regression. The most simplest method to identify the issue is to revert some > TIPC patches to identify which ones caused the regression. > > > > > [2c3f0b001900_1_RP_0:/] $ tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 zone 1 1 1 <1.1.1:483390874> 483390875 node 1 88 88 <1.1.1:2870943326> 2870943327 zone 15003 5 5 <1.1.1:3556781096> 3556781097 zone [2c3f0b001900_1_RP_0:/]$ > > > > please let me know if any issue. > > thanks & regardsMahesh kumar.L > > > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li... > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion |
From: David M. <da...@da...> - 2019-07-17 22:24:59
|
From: Jon Maloy <jon...@er...> Date: Wed, 17 Jul 2019 23:43:44 +0200 > The tipc_msg_validate() function leaves a boolean flag 'validated' in > the validated buffer's control block, to avoid performing this action > more than once. However, at reception of new packets, the position of > this field may already have been set by lower layer protocols, so > that the packet is erroneously perceived as already validated by TIPC. > > We fix this by initializing the said field to 'false' before performing > the initial validation. > > Signed-off-by: Jon Maloy <jon...@er...> Applied. |
From: Jon M. <jon...@er...> - 2019-07-17 21:43:59
|
The tipc_msg_validate() function leaves a boolean flag 'validated' in the validated buffer's control block, to avoid performing this action more than once. However, at reception of new packets, the position of this field may already have been set by lower layer protocols, so that the packet is erroneously perceived as already validated by TIPC. We fix this by initializing the said field to 'false' before performing the initial validation. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/node.c | 1 + 1 file changed, 1 insertion(+) diff --git a/net/tipc/node.c b/net/tipc/node.c index 324a1f9..3a5be1d 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1807,6 +1807,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) __skb_queue_head_init(&xmitq); /* Ensure message is well-formed before touching the header */ + TIPC_SKB_CB(skb)->validated = false; if (unlikely(!tipc_msg_validate(&skb))) goto discard; hdr = buf_msg(skb); -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-07-17 16:01:31
|
Hi Tung, Your analysis seems correct, but your patch is too intrusive, and can be made much simpler. See below. > -----Original Message----- > From: Tung Nguyen <tun...@de...> > Sent: 9-Jul-19 22:39 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [tipc-discussion][PATCH v1 1/1] tipc: fix starvation of sending sockets > when handling wakeup messages > > Commit 365ad35 ("tipc: reduce risk of user starvation during link congestion") > aimed to allow senders to add their lists of socket buffers to the link backlog > queues under link congestion. However, when dequeuing the link wakeup > queue by link_prepare_wakeup(), there might be a worst case scenario: > - More than 10 wakeup messages near the wakeup queue head are not > dequeued because the backlog queue length is still greater than the limit. > - Many wakeup messages near the wakeup queue tail are not dequeued > though their importance is different from the one of 10 wakeup messages > near the queue head and the backlog queue length is less than the limit. > > Furthermore, function tipc_sk_rcv() consumes both normal data messages > and wakeup messages from the link input queue. By allowing > oversubscriptions, the number of messages passed through tipc_sk_rcv() > would be higher. > Applying destination port filter to wakeup messages via tipc_skb_peek_port() > is not necessary and even causes more overhead. Wakeup messages are not time critical, so this is not a good reason to bypass the tipc_sk_rcv() function. It is an important principle to only have one single entry point in the socket for arriving messages, and we should stick to that. > > As a result, some non-blocking socket senders are not able to send data > because epoll() takes many seconds to return EPOLLOUT. > > This commit fixes this issues by: > - Allowing to dequeue as many wakeup messages as possible. That is good. Just removing the 10-skb limit would probably be sufficient to resolve the problem. > - Separating wakeup messages from the link input queue. All wakeup > messages are now put in a local queue and consumed in a simple way. There is no reason to do this. Instead, you should just take the messages in the wakeup queue, sort them by importance order into a temporary queue inside link_prepare_wakeup(), and add that queue to the head of the input queue, instead of to the tail. That is all you need to do, in addition to removing the 10-buffer limit. Adding a new queue on the stack and then pass it along everywhere is very intrusive, introduces a lot of extra code and complexity, and adds performance overhead to the critical data path. This is not necessary to achieve what you want. BR ///jon > > Fixes: 365ad35 ("tipc: reduce risk of user starvation during link congestion") > Signed-off-by: Tung Nguyen <tun...@de...> > --- > net/tipc/bcast.c | 42 +++++++++++----------- > net/tipc/link.c | 67 +++++++++++++++++----------------- > net/tipc/link.h | 12 ++++--- > net/tipc/node.c | 28 +++++++++++---- > net/tipc/socket.c | 91 +++++++++++++++++++++++++++++++++++++++++-- > ---- > net/tipc/socket.h | 1 + > 6 files changed, 166 insertions(+), 75 deletions(-) > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > 6c997d4a6218..4de0f9780ef5 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -421,11 +421,11 @@ int tipc_mcast_xmit(struct net *net, struct > sk_buff_head *pkts, int tipc_bcast_rcv(struct net *net, struct tipc_link *l, > struct sk_buff *skb) { > struct tipc_msg *hdr = buf_msg(skb); > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > int rc; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { > kfree_skb(skb); > @@ -434,16 +434,16 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link > *l, struct sk_buff *skb) > > tipc_bcast_lock(net); > if (msg_user(hdr) == BCAST_PROTOCOL) > - rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); > + rc = tipc_link_bc_nack_rcv(l, skb, &xmitq, &wakeupq); > else > - rc = tipc_link_rcv(l, skb, NULL); > + rc = tipc_link_rcv(l, skb, NULL, &wakeupq); > tipc_bcast_unlock(net); > > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > > return rc; > } > @@ -455,25 +455,25 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link > *l, struct sk_buff *skb) void tipc_bcast_ack_rcv(struct net *net, struct > tipc_link *l, > struct tipc_msg *hdr) > { > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > u16 acked = msg_bcast_ack(hdr); > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > > /* Ignore bc acks sent by peer before bcast synch point was received */ > if (msg_bc_ack_invalid(hdr)) > return; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_bcast_lock(net); > - tipc_link_bc_ack_rcv(l, acked, &xmitq); > + tipc_link_bc_ack_rcv(l, acked, &xmitq, &wakeupq); > tipc_bcast_unlock(net); > > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > } > > /* tipc_bcast_synch_rcv - check and update rcv link with peer's send state > @@ -483,17 +483,17 @@ void tipc_bcast_ack_rcv(struct net *net, struct > tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, > struct tipc_msg *hdr) > { > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > int rc = 0; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_bcast_lock(net); > if (msg_type(hdr) != STATE_MSG) { > tipc_link_bc_init_rcv(l, hdr); > } else if (!msg_bc_ack_invalid(hdr)) { > - tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); > + tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq, > &wakeupq); > rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq); > } > tipc_bcast_unlock(net); > @@ -501,8 +501,8 @@ int tipc_bcast_sync_rcv(struct net *net, struct > tipc_link *l, > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > return rc; > } > > @@ -529,13 +529,13 @@ void tipc_bcast_add_peer(struct net *net, struct > tipc_link *uc_l, void tipc_bcast_remove_peer(struct net *net, struct tipc_link > *rcv_l) { > struct tipc_link *snd_l = tipc_bc_sndlink(net); > - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_bcast_lock(net); > - tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); > + tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq, &wakeupq); > tipc_bcbase_select_primary(net); > tipc_bcbase_calc_bc_threshold(net); > tipc_bcast_unlock(net); > @@ -543,8 +543,8 @@ void tipc_bcast_remove_peer(struct net *net, struct > tipc_link *rcv_l) > tipc_bcbase_xmit(net, &xmitq); > > /* Any socket wakeup messages ? */ > - if (!skb_queue_empty(inputq)) > - tipc_sk_rcv(net, inputq); > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > } > > int tipc_bclink_reset_stats(struct net *net) diff --git a/net/tipc/link.c > b/net/tipc/link.c index 2050fd386642..e67d7e6a793b 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -237,7 +237,8 @@ static int link_is_up(struct tipc_link *l) } > > static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool > probe, > bool probe_reply, u16 rcvgap, > int tolerance, int priority, > @@ -331,6 +332,11 @@ struct sk_buff_head *tipc_link_inputq(struct > tipc_link *l) > return l->inputq; > } > > +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l) { > + return &l->wakeupq; > +} > + > char tipc_link_plane(struct tipc_link *l) { > return l->net_plane; > @@ -355,19 +361,21 @@ void tipc_link_add_bc_peer(struct tipc_link > *snd_l, > > void tipc_link_remove_bc_peer(struct tipc_link *snd_l, > struct tipc_link *rcv_l, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > u16 ack = snd_l->snd_nxt - 1; > > snd_l->ackers--; > rcv_l->bc_peer_is_up = true; > rcv_l->state = LINK_ESTABLISHED; > - tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); > + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq, wakeupq); > trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); > tipc_link_reset(rcv_l); > rcv_l->state = LINK_RESET; > if (!snd_l->ackers) { > trace_tipc_link_reset(snd_l, TIPC_DUMP_ALL, "zero ackers!"); > + skb_queue_splice_tail_init(&snd_l->wakeupq, wakeupq); > tipc_link_reset(snd_l); > snd_l->state = LINK_RESET; > __skb_queue_purge(xmitq); > @@ -500,7 +508,7 @@ bool tipc_link_create(struct net *net, char *if_name, > int bearer_id, > __skb_queue_head_init(&l->backlogq); > __skb_queue_head_init(&l->deferdq); > __skb_queue_head_init(&l->failover_deferdq); > - skb_queue_head_init(&l->wakeupq); > + __skb_queue_head_init(&l->wakeupq); > skb_queue_head_init(l->inputq); > return true; > } > @@ -839,9 +847,8 @@ static int link_schedule_user(struct tipc_link *l, struct > tipc_msg *hdr) > dnode, l->addr, dport, 0, 0); > if (!skb) > return -ENOBUFS; > - msg_set_dest_droppable(buf_msg(skb), true); > TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); > - skb_queue_tail(&l->wakeupq, skb); > + __skb_queue_tail(&l->wakeupq, skb); > l->stats.link_congs++; > trace_tipc_link_conges(l, TIPC_DUMP_ALL, "wakeup scheduled!"); > return -ELINKCONG; > @@ -853,46 +860,34 @@ static int link_schedule_user(struct tipc_link *l, > struct tipc_msg *hdr) > * Wake up a number of waiting users, as permitted by available space > * in the send queue > */ > -static void link_prepare_wakeup(struct tipc_link *l) > +static void link_prepare_wakeup(struct tipc_link *l, > + struct sk_buff_head *wakeupq) > { > struct sk_buff *skb, *tmp; > - int imp, i = 0; > + int imp; > > skb_queue_walk_safe(&l->wakeupq, skb, tmp) { > imp = TIPC_SKB_CB(skb)->chain_imp; > if (l->backlog[imp].len < l->backlog[imp].limit) { > - skb_unlink(skb, &l->wakeupq); > - skb_queue_tail(l->inputq, skb); > - } else if (i++ > 10) { > - break; > + __skb_unlink(skb, &l->wakeupq); > + __skb_queue_tail(wakeupq, skb); > } > } > } > > void tipc_link_reset(struct tipc_link *l) { > - struct sk_buff_head list; > - > - __skb_queue_head_init(&list); > - > l->in_session = false; > /* Force re-synch of peer session number before establishing */ > l->peer_session--; > l->session++; > l->mtu = l->advertised_mtu; > > - spin_lock_bh(&l->wakeupq.lock); > - skb_queue_splice_init(&l->wakeupq, &list); > - spin_unlock_bh(&l->wakeupq.lock); > - > - spin_lock_bh(&l->inputq->lock); > - skb_queue_splice_init(&list, l->inputq); > - spin_unlock_bh(&l->inputq->lock); > - > __skb_queue_purge(&l->transmq); > __skb_queue_purge(&l->deferdq); > __skb_queue_purge(&l->backlogq); > __skb_queue_purge(&l->failover_deferdq); > + __skb_queue_purge(&l->wakeupq); > l->backlog[TIPC_LOW_IMPORTANCE].len = 0; > l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; > l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; @@ -1445,9 +1440,11 > @@ static int tipc_link_build_nack_msg(struct tipc_link *l, > * @l: the link that should handle the message > * @skb: TIPC packet > * @xmitq: queue to place packets to be sent after this call > + * @wakeupq: list of wakeup messages to be consumed after this call > */ > int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct sk_buff_head *defq = &l->deferdq; > struct tipc_msg *hdr = buf_msg(skb); > @@ -1456,7 +1453,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff > *skb, > > /* Verify and update link state */ > if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) > - return tipc_link_proto_rcv(l, skb, xmitq); > + return tipc_link_proto_rcv(l, skb, xmitq, wakeupq); > > /* Don't send probe at next timeout expiration */ > l->silent_intv_cnt = 0; > @@ -1484,7 +1481,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff > *skb, > l->stale_cnt = 0; > tipc_link_advance_backlog(l, xmitq); > if (unlikely(!skb_queue_empty(&l->wakeupq))) > - link_prepare_wakeup(l); > + link_prepare_wakeup(l, wakeupq); > } > > /* Defer delivery if sequence gap */ > @@ -1518,6 +1515,7 @@ static void tipc_link_build_proto_msg(struct > tipc_link *l, int mtyp, bool probe, > bool probe_reply, u16 rcvgap, > int tolerance, int priority, > struct sk_buff_head *xmitq) > + > { > struct tipc_link *bcl = l->bc_rcvlink; > struct sk_buff *skb; > @@ -1786,7 +1784,8 @@ bool tipc_link_validate_msg(struct tipc_link *l, > struct tipc_msg *hdr) > * network plane > */ > static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct tipc_msg *hdr = buf_msg(skb); > struct tipc_gap_ack_blks *ga = NULL; > @@ -1926,7 +1925,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, > struct sk_buff *skb, > > tipc_link_advance_backlog(l, xmitq); > if (unlikely(!skb_queue_empty(&l->wakeupq))) > - link_prepare_wakeup(l); > + link_prepare_wakeup(l, wakeupq); > } > exit: > kfree_skb(skb); > @@ -2072,7 +2071,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct > tipc_msg *hdr, } > > void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct sk_buff *skb, *tmp; > struct tipc_link *snd_l = l->bc_sndlink; @@ -2102,7 +2102,7 @@ void > tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > l->acked = acked; > tipc_link_advance_backlog(snd_l, xmitq); > if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) > - link_prepare_wakeup(snd_l); > + link_prepare_wakeup(snd_l, wakeupq); > } > > /* tipc_link_bc_nack_rcv(): receive broadcast nack message @@ -2110,7 > +2110,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5. > */ > int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq) > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq) > { > struct tipc_msg *hdr = buf_msg(skb); > u32 dnode = msg_destnode(hdr); > @@ -2130,7 +2131,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct > sk_buff *skb, > return 0; > > if (dnode == tipc_own_addr(l->net)) { > - tipc_link_bc_ack_rcv(l, acked, xmitq); > + tipc_link_bc_ack_rcv(l, acked, xmitq, wakeupq); > rc = tipc_link_retrans(l->bc_sndlink, l, from, to, xmitq); > l->stats.recv_nacks++; > return rc; > diff --git a/net/tipc/link.h b/net/tipc/link.h index > adcad65e761c..9568bd965945 100644 > --- a/net/tipc/link.h > +++ b/net/tipc/link.h > @@ -107,6 +107,7 @@ void tipc_link_reset_stats(struct tipc_link *l); int > tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, > struct sk_buff_head *xmitq); > struct sk_buff_head *tipc_link_inputq(struct tipc_link *l); > +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l); > u16 tipc_link_rcv_nxt(struct tipc_link *l); > u16 tipc_link_acked(struct tipc_link *l); > u32 tipc_link_id(struct tipc_link *l); > @@ -130,25 +131,28 @@ int __tipc_nl_add_link(struct net *net, struct > tipc_nl_msg *msg, int tipc_nl_parse_link_prop(struct nlattr *prop, struct > nlattr *props[]); int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head > *xmitq); int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, struct sk_buff_head *wakeupq); > int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq); > void tipc_link_add_bc_peer(struct tipc_link *snd_l, > struct tipc_link *uc_l, > struct sk_buff_head *xmitq); > void tipc_link_remove_bc_peer(struct tipc_link *snd_l, > struct tipc_link *rcv_l, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > int tipc_link_bc_peers(struct tipc_link *l); void tipc_link_set_mtu(struct > tipc_link *l, int mtu); int tipc_link_mtu(struct tipc_link *l); void > tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > void tipc_link_build_bc_sync_msg(struct tipc_link *l, > struct sk_buff_head *xmitq); > void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); > int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, > struct sk_buff_head *xmitq); > int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, > - struct sk_buff_head *xmitq); > + struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq); > bool tipc_link_too_silent(struct tipc_link *l); #endif diff --git > a/net/tipc/node.c b/net/tipc/node.c index 550581d47d51..d0ec29081b11 > 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -154,6 +154,7 @@ enum { > > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq, > struct tipc_media_addr **maddr); > static void tipc_node_link_down(struct tipc_node *n, int bearer_id, > bool delete); > @@ -803,6 +804,7 @@ static void tipc_node_link_failover(struct tipc_node > *n, struct tipc_link *l, > */ > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > struct sk_buff_head *xmitq, > + struct sk_buff_head *wakeupq, > struct tipc_media_addr **maddr) > { > struct tipc_link_entry *le = &n->links[*bearer_id]; @@ -851,6 +853,7 > @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); > trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down!"); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); > tipc_link_reset(l); > tipc_link_build_reset_msg(l, xmitq); > *maddr = &n->links[*bearer_id].maddr; @@ -868,6 +871,7 @@ > static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, > n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1); > tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); > trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down -> failover!"); > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); > tipc_link_reset(l); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); @@ -881,18 > +885,20 @@ static void tipc_node_link_down(struct tipc_node *n, int > bearer_id, bool delete) > struct tipc_media_addr *maddr = NULL; > struct tipc_link *l = le->link; > int old_bearer_id = bearer_id; > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > > if (!l) > return; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > tipc_node_write_lock(n); > if (!tipc_link_is_establishing(l)) { > - __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); > + __tipc_node_link_down(n, &bearer_id, &xmitq, &wakeupq, > &maddr); > } else { > /* Defuse pending tipc_node_link_up() */ > + skb_queue_splice_tail_init(tipc_link_wakeupq(l), &wakeupq); > tipc_link_reset(l); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > } > @@ -908,6 +914,7 @@ static void tipc_node_link_down(struct tipc_node > *n, int bearer_id, bool delete) > if (!skb_queue_empty(&xmitq)) > tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); > tipc_sk_rcv(n->net, &le->inputq); > + tipc_sk_rcv_wakeup(n->net, &wakeupq); > } > > static bool node_is_up(struct tipc_node *n) @@ -1652,6 +1659,7 @@ static > bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, > u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); > u16 exp_pkts = msg_msgcnt(hdr); > u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; > + struct sk_buff_head wakeupq; > int state = n->state; > struct tipc_link *l, *tnl, *pl = NULL; > struct tipc_media_addr *maddr; > @@ -1711,9 +1719,12 @@ static bool tipc_node_check_state(struct > tipc_node *n, struct sk_buff *skb, > if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { > syncpt = oseqno + exp_pkts - 1; > if (pl && !tipc_link_is_reset(pl)) { > - __tipc_node_link_down(n, &pb_id, xmitq, &maddr); > + __skb_queue_head_init(&wakeupq); > + __tipc_node_link_down(n, &pb_id, xmitq, > + &wakeupq, &maddr); > trace_tipc_node_link_down(n, true, > "node link down <- failover!"); > + skb_queue_splice_tail(&wakeupq, tipc_link_wakeupq(l)); > tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl), > tipc_link_inputq(l)); > } > @@ -1795,7 +1806,7 @@ static bool tipc_node_check_state(struct > tipc_node *n, struct sk_buff *skb, > */ > void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) { > - struct sk_buff_head xmitq; > + struct sk_buff_head xmitq, wakeupq; > struct tipc_node *n; > struct tipc_msg *hdr; > int bearer_id = b->identity; > @@ -1805,6 +1816,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > u16 bc_ack; > > __skb_queue_head_init(&xmitq); > + __skb_queue_head_init(&wakeupq); > > /* Ensure message is well-formed before touching the header */ > if (unlikely(!tipc_msg_validate(&skb))) > @@ -1842,7 +1854,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > if (likely((n->state == SELF_UP_PEER_UP) && (usr != > TUNNEL_PROTOCOL))) { > spin_lock_bh(&le->lock); > if (le->link) { > - rc = tipc_link_rcv(le->link, skb, &xmitq); > + rc = tipc_link_rcv(le->link, skb, &xmitq, &wakeupq); > skb = NULL; > } > spin_unlock_bh(&le->lock); > @@ -1856,7 +1868,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > tipc_node_write_lock(n); > if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) { > if (le->link) { > - rc = tipc_link_rcv(le->link, skb, &xmitq); > + rc = tipc_link_rcv(le->link, skb, > + &xmitq, &wakeupq); > skb = NULL; > } > } > @@ -1878,6 +1891,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > if (!skb_queue_empty(&le->inputq)) > tipc_sk_rcv(net, &le->inputq); > > + if (!skb_queue_empty(&wakeupq)) > + tipc_sk_rcv_wakeup(net, &wakeupq); > + > if (!skb_queue_empty(&xmitq)) > tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); > > diff --git a/net/tipc/socket.c b/net/tipc/socket.c index > dd8537f988c4..fa48fb14add7 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -2224,6 +2224,33 @@ static int tipc_sk_backlog_rcv(struct sock *sk, > struct sk_buff *skb) > return 0; > } > > +/** > + * tipc_sk_add_backlog - add the skb to the socket backlog queue > + * @sk: socket where the skb should be enqueued > + * @skb: the skb being enqueued > + * > + * Return true if the skb was added. Otherwise, return false */ static > +bool tipc_sk_add_backlog(struct sock *sk, struct sk_buff *skb) { > + unsigned int lim; > + atomic_t *dcnt; > + > + /* Try backlog, compensating for double-counted bytes */ > + dcnt = &tipc_sk(sk)->dupl_rcvcnt; > + if (!sk->sk_backlog.len) > + atomic_set(dcnt, 0); > + > + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); > + if (likely(!sk_add_backlog(sk, skb, lim))) { > + trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, > + "bklg & rcvq >90% allocated!"); > + return true; > + } > + > + return false; > +} > + > /** > * tipc_sk_enqueue - extract all buffers with destination 'dport' from > * inputq and try adding them to socket or backlog queue > @@ -2238,8 +2265,6 @@ static void tipc_sk_enqueue(struct sk_buff_head > *inputq, struct sock *sk, { > unsigned long time_limit = jiffies + 2; > struct sk_buff *skb; > - unsigned int lim; > - atomic_t *dcnt; > u32 onode; > > while (skb_queue_len(inputq)) { > @@ -2256,16 +2281,9 @@ static void tipc_sk_enqueue(struct sk_buff_head > *inputq, struct sock *sk, > continue; > } > > - /* Try backlog, compensating for double-counted bytes */ > - dcnt = &tipc_sk(sk)->dupl_rcvcnt; > - if (!sk->sk_backlog.len) > - atomic_set(dcnt, 0); > - lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); > - if (likely(!sk_add_backlog(sk, skb, lim))) { > - trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, > - "bklg & rcvq >90% allocated!"); > + /* Add the skb to the socket backlog queue */ > + if (tipc_sk_add_backlog(sk, skb)) > continue; > - } > > trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!"); > /* Overload => reject message back to sender */ @@ -2332,6 > +2350,57 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) > } > } > > +/** > + * tipc_sk_rcv_wakeup - handle a chain of wakeup messages > + * @wakeupq: list of wakeup messages > + * Consumes all messages in the list until it is empty */ void > +tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq) { > + u32 dport = 0; > + struct tipc_sock *tsk; > + struct sock *sk; > + struct sk_buff *skb, *tmp; > + > +start: > + if (!skb_queue_len(wakeupq)) > + return; > + > + skb_queue_walk_safe(wakeupq, skb, tmp) { > + dport = msg_destport(buf_msg(skb)); > + tsk = tipc_sk_lookup(net, dport); > + > + if (!unlikely(tsk)) { > + __skb_unlink(skb, wakeupq); > + kfree_skb(skb); > + continue; > + } > + > + sk = &tsk->sk; > + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { > + __skb_unlink(skb, wakeupq); > + if (!sock_owned_by_user(sk)) { > + tipc_dest_del(&tsk->cong_links, > + msg_orignode(buf_msg(skb)), 0); > + /* coupled with smp_rmb() in > + * tipc_wait_for_cond() > + */ > + smp_wmb(); > + tsk->cong_link_cnt--; > + sk->sk_write_space(sk); > + kfree_skb(skb); > + } else if (unlikely(!tipc_sk_add_backlog(sk, skb))) { > + kfree_skb(skb); > + pr_warn("Drop wakeup message for port %u\n", > + tipc_sk(sk)->portid); > + } > + spin_unlock_bh(&sk->sk_lock.slock); > + } > + sock_put(sk); > + } > + goto start; > +} > + > static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) { > DEFINE_WAIT_FUNC(wait, woken_wake_function); diff --git > a/net/tipc/socket.h b/net/tipc/socket.h index 235b9679acee..44eb9fb68d7e > 100644 > --- a/net/tipc/socket.h > +++ b/net/tipc/socket.h > @@ -54,6 +54,7 @@ struct tipc_sock; > int tipc_socket_init(void); > void tipc_socket_stop(void); > void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); > +void tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq); > void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, > struct sk_buff_head *inputq); > void tipc_sk_reinit(struct net *net); > -- > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-07-16 20:27:34
|
Acked-by: Jon Maloy <jon...@er...> Remember to mark this one as 'net' and not 'net-next' when you post it. > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 16-Jul-19 03:26 > To: Jon Maloy <jon...@er...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: [net-next] tipc: fix retransmission failure when link re-established > > Currently a link is declared stale and reset if stale limit time is longer than link > tolerance time. But, this stale limit does not initial correctly when the link is > resetting. This lead to a link declared as failure because reset criteria always > passed though no packet re-transmitted when link is re-establishing. > > To fix this, we set stale limit time far into the future in two places: > reset a link and acked from peer. > > Fixes: 77cf8edbc0e7 ("tipc: simplify stale link failure criteria") > Signed-off-by: Hoang Le <hoa...@de...> > --- > net/tipc/link.c | 2 ++ > 1 file changed, 2 insertions(+) > > diff --git a/net/tipc/link.c b/net/tipc/link.c index > 66d3a07bc571..2ba79d451f08 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -909,6 +909,7 @@ void tipc_link_reset(struct tipc_link *l) > l->silent_intv_cnt = 0; > l->rst_cnt = 0; > l->bc_peer_is_up = false; > + l->stale_limit = msecs_to_jiffies(~0); > memset(&l->mon_state, 0, sizeof(l->mon_state)); > tipc_link_reset_stats(l); > } > @@ -1510,6 +1511,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff > *skb, > > /* Forward queues and wake up waiting users */ > if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { > + l->stale_limit = msecs_to_jiffies(~0); > tipc_link_advance_backlog(l, xmitq); > if (unlikely(!skb_queue_empty(&l->wakeupq))) > link_prepare_wakeup(l); > -- > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-07-16 12:25:00
|
Hi Kumar, First of all, I would recommend you not to use <1.88.88> as listener address, since the service types [0,64] are reserved for internal use by TIPC itself. I am also a little confused about which kernel you are using. Are you now using 4.19.44, and not 4.4.180, or was this just a test? BTW, since 4.17, the use of “zone” scope is deprecated, and translated internally to “cluster”. There was never any functional difference between them anyway. Regarding your system, a good start would be to issue the following commands, both on the host and in the container: $ ip addr $ tipc bearer list $ tipc link list $ tipc nametable show BR ///jon From: Mahesh Kumar <lm...@ya...> Sent: 15-Jul-19 16:21 To: Ying Xue <yin...@wi...>; Jon Maloy <jon...@er...> Cc: tip...@li... Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request Hi Jon, Thanks a lot for checking this and providing feedback. A brief background of the system. In the host system, upon bootup node address 1.1.1 would be configured. I added is a listener in 1.88.88. tams_srv_addr<https://wwwin-opengrok.cisco.com/v1611_throttle/xref/polaris/binos/infra/tam_svcs/server/platform/mcp/src/tams_tipc.c#tams_srv_addr>.family<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=family> = AF_TIPC<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=AF_TIPC>; tams_srv_addr<https://wwwin-opengrok.cisco.com/v1611_throttle/xref/polaris/binos/infra/tam_svcs/server/platform/mcp/src/tams_tipc.c#tams_srv_addr>.addrtype<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=addrtype> = TIPC_ADDR_NAMESEQ<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=TIPC_ADDR_NAMESEQ>; tams_srv_addr<https://wwwin-opengrok.cisco.com/v1611_throttle/xref/polaris/binos/infra/tam_svcs/server/platform/mcp/src/tams_tipc.c#tams_srv_addr>.addr<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=addr>.nameseq<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=nameseq>.type<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=type> = TIPC_TOP_SRV<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=TIPC_TOP_SRV>; tams_srv_addr<https://wwwin-opengrok.cisco.com/v1611_throttle/xref/polaris/binos/infra/tam_svcs/server/platform/mcp/src/tams_tipc.c#tams_srv_addr>.addr<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=addr>.nameseq<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=nameseq>.lower<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=lower> = TAMS_TIPC_LISTEN_PORT<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=TAMS_TIPC_LISTEN_PORT>; <<88 tams_srv_addr<https://wwwin-opengrok.cisco.com/v1611_throttle/xref/polaris/binos/infra/tam_svcs/server/platform/mcp/src/tams_tipc.c#tams_srv_addr>.addr<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=addr>.nameseq<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=nameseq>.upper<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=upper> = TAMS_TIPC_LISTEN_PORT<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=TAMS_TIPC_LISTEN_PORT>; tams_srv_addr<https://wwwin-opengrok.cisco.com/v1611_throttle/xref/polaris/binos/infra/tam_svcs/server/platform/mcp/src/tams_tipc.c#tams_srv_addr>.scope<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=scope> = TIPC_ZONE_SCOPE<https://wwwin-opengrok.cisco.com/v1611_throttle/s?defs=TIPC_ZONE_SCOPE>; Now from a container, I am trying to access the host service (88); by setting its container's node address as 1.1.100 using "tipc node set addr 1.1.100". This used to work fine, but it bails out with 4.4.180 kernel. Another change I noticed by changing my kernel to 4.19.44 is that all of the listeners are in cluster scope now instead of zone scope. I didn't had a chance to check the results from container with the new kernel. >>from host >>>tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 cluster 1 1 1 <1.1.1:2399481494> 2399481495 node 1 88 88 <1.1.1:2536304640> 2536304641 cluster 15003 5 5 <1.1.1:3973838764> 3973838765 cluster [Switch_1_RP_0:/]$ uname -a Linux Switch_1_RP_0 4.19.44 #1 SMP Wed May 22 13:50:02 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux Please let me know; which outputs should I need to share from the 'host' and 'container' side. thanks & regards Mahesh kumar.L On Monday, 15 July, 2019, 12:36:09 pm GMT-7, Jon Maloy <jon...@er...<mailto:jon...@er...>> wrote: Hi Kumar, Your binding table listing reveals that your node already has an address <1.1.1>, which explains why your address setting fails. You should check if you have any script that sets the address by default at module loading, or maybe you just set it manually and then forgot... Furthermore, its seems you have published a service <1,88,88> which means you are illegally using the reserved service type <1>. The latter isn't your fault, but due to a bug in TIPC that wrongly allows users to publish such service types, in the function tipc_bind(). I discovered this ug a couple of months ago, but haven't fixed it yet, and I am not quite sure how to do it without breaking any BPI. This may cause you surprises, but I cannot see why it would cause the bearer enabling to fail. If this problem persists, you should post some more system info about your interfaces, which tipc links you have etc. BR ///jon > -----Original Message----- > From: Mahesh Kumar via tipc-discussion <tipc- > dis...@li...<mailto:dis...@li...>> > Sent: 15-Jul-19 11:49 > To: tip...@li...<mailto:tip...@li...>; Ying Xue > <yin...@wi...<mailto:yin...@wi...>> > Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request > > Hi Ying, > Thank you very much for letting me know.Do we suspect any related ioctl() > patches?. could you please point me to link where we can review the TIPC > patches that went in the kernel.?. > Much appreciate the help. > thanks & regardsMahesh kumar.L > On Monday, 15 July, 2019, 02:56:32 am GMT-7, Ying Xue > <yin...@wi...<mailto:yin...@wi...>> wrote: > > On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > > Tipc Team, > > > > Greetings!. > > I have been using TIPC for about a year without any issueHowever the > > TIPC tool is bailing out when I tried to set address, bearer > > > > > > / # tipc node set addr 1.1.100 > > > > error: Operation not permitted > > > > / # tipc bearer enable media eth dev ieobc > > > > error: Invalid argument > > > > / # > > > > I am using the new kernel now; > > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 > 15:36:10 > > PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated > > (version 2.0.0)[ 29.436768] tipc: Started in single node mode > > Suspected some TIPC patches integrated through 4.4.180 release introduced > regression. The most simplest method to identify the issue is to revert some > TIPC patches to identify which ones caused the regression. > > > > > [2c3f0b001900_1_RP_0:/] $ tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 zone 1 1 1 <1.1.1:483390874> 483390875 node 1 88 88 <1.1.1:2870943326> 2870943327 zone 15003 5 5 <1.1.1:3556781096> 3556781097 zone [2c3f0b001900_1_RP_0:/]$ > > > > please let me know if any issue. > > thanks & regardsMahesh kumar.L > > > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li...<mailto:tip...@li...> > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > > > > _______________________________________________ > tipc-discussion mailing list > tip...@li...<mailto:tip...@li...> > https://lists.sourceforge.net/lists/listinfo/tipc-discussion |
From: Hoang Le <hoa...@de...> - 2019-07-16 07:26:52
|
Currently a link is declared stale and reset if stale limit time is longer than link tolerance time. But, this stale limit does not initial correctly when the link is resetting. This lead to a link declared as failure because reset criteria always passed though no packet re-transmitted when link is re-establishing. To fix this, we set stale limit time far into the future in two places: reset a link and acked from peer. Fixes: 77cf8edbc0e7 ("tipc: simplify stale link failure criteria") Signed-off-by: Hoang Le <hoa...@de...> --- net/tipc/link.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/net/tipc/link.c b/net/tipc/link.c index 66d3a07bc571..2ba79d451f08 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -909,6 +909,7 @@ void tipc_link_reset(struct tipc_link *l) l->silent_intv_cnt = 0; l->rst_cnt = 0; l->bc_peer_is_up = false; + l->stale_limit = msecs_to_jiffies(~0); memset(&l->mon_state, 0, sizeof(l->mon_state)); tipc_link_reset_stats(l); } @@ -1510,6 +1511,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Forward queues and wake up waiting users */ if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) { + l->stale_limit = msecs_to_jiffies(~0); tipc_link_advance_backlog(l, xmitq); if (unlikely(!skb_queue_empty(&l->wakeupq))) link_prepare_wakeup(l); -- 2.17.1 |
From: Mahesh K. <lm...@ya...> - 2019-07-15 20:20:52
|
Hi Jon, Thanks a lot for checking this and providing feedback. A brief background of the system. In the host system, upon bootup node address 1.1.1 would be configured.I added is a listener in 1.88.88. tams_srv_addr.family = AF_TIPC; tams_srv_addr.addrtype = TIPC_ADDR_NAMESEQ; tams_srv_addr.addr.nameseq.type = TIPC_TOP_SRV; tams_srv_addr.addr.nameseq.lower = TAMS_TIPC_LISTEN_PORT; <<88 tams_srv_addr.addr.nameseq.upper = TAMS_TIPC_LISTEN_PORT; tams_srv_addr.scope = TIPC_ZONE_SCOPE; Now from a container, I am trying to access the host service (88); by setting its container's node address as 1.1.100 using "tipc node set addr 1.1.100".This used to work fine, but it bails out with 4.4.180 kernel. Another change I noticed by changing my kernel to 4.19.44 is that all of the listeners are in cluster scope now instead of zone scope. I didn't had a chance to check the results from container with the new kernel.>>from host >>>tipc-config -ntType Lower Upper Port Identity Publication Scope0 16781313 16781313 <1.1.1:0> 16781313 cluster1 1 1 <1.1.1:2399481494> 2399481495 node1 88 88 <1.1.1:2536304640> 2536304641 cluster15003 5 5 <1.1.1:3973838764> 3973838765 cluster[Switch_1_RP_0:/]$ uname -aLinux Switch_1_RP_0 4.19.44 #1 SMP Wed May 22 13:50:02 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux Please let me know; which outputs should I need to share from the 'host' and 'container' side. thanks & regardsMahesh kumar.L On Monday, 15 July, 2019, 12:36:09 pm GMT-7, Jon Maloy <jon...@er...> wrote: Hi Kumar, Your binding table listing reveals that your node already has an address <1.1.1>, which explains why your address setting fails. You should check if you have any script that sets the address by default at module loading, or maybe you just set it manually and then forgot... Furthermore, its seems you have published a service <1,88,88> which means you are illegally using the reserved service type <1>. The latter isn't your fault, but due to a bug in TIPC that wrongly allows users to publish such service types, in the function tipc_bind(). I discovered this ug a couple of months ago, but haven't fixed it yet, and I am not quite sure how to do it without breaking any BPI. This may cause you surprises, but I cannot see why it would cause the bearer enabling to fail. If this problem persists, you should post some more system info about your interfaces, which tipc links you have etc. BR ///jon > -----Original Message----- > From: Mahesh Kumar via tipc-discussion <tipc- > dis...@li...> > Sent: 15-Jul-19 11:49 > To: tip...@li...; Ying Xue > <yin...@wi...> > Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request > > Hi Ying, > Thank you very much for letting me know.Do we suspect any related ioctl() > patches?. could you please point me to link where we can review the TIPC > patches that went in the kernel.?. > Much appreciate the help. > thanks & regardsMahesh kumar.L > On Monday, 15 July, 2019, 02:56:32 am GMT-7, Ying Xue > <yin...@wi...> wrote: > > On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > > Tipc Team, > > > > Greetings!. > > I have been using TIPC for about a year without any issueHowever the > > TIPC tool is bailing out when I tried to set address, bearer > > > > > > / # tipc node set addr 1.1.100 > > > > error: Operation not permitted > > > > / # tipc bearer enable media eth dev ieobc > > > > error: Invalid argument > > > > / # > > > > I am using the new kernel now; > > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 > 15:36:10 > > PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated > > (version 2.0.0)[ 29.436768] tipc: Started in single node mode > > Suspected some TIPC patches integrated through 4.4.180 release introduced > regression. The most simplest method to identify the issue is to revert some > TIPC patches to identify which ones caused the regression. > > > > > [2c3f0b001900_1_RP_0:/] $ tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 zone 1 1 1 <1.1.1:483390874> 483390875 node 1 88 88 <1.1.1:2870943326> 2870943327 zone 15003 5 5 <1.1.1:3556781096> 3556781097 zone [2c3f0b001900_1_RP_0:/]$ > > > > please let me know if any issue. > > thanks & regardsMahesh kumar.L > > > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li... > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion |
From: Jon M. <jon...@er...> - 2019-07-15 20:09:12
|
Hi Kumar, Your binding table listing reveals that your node already has an address <1.1.1>, which explains why your address setting fails. You should check if you have any script that sets the address by default at module loading, or maybe you just set it manually and then forgot... Furthermore, its seems you have published a service <1,88,88> which means you are illegally using the reserved service type <1>. The latter isn't your fault, but due to a bug in TIPC that wrongly allows users to publish such service types, in the function tipc_bind(). I discovered this ug a couple of months ago, but haven't fixed it yet, and I am not quite sure how to do it without breaking any BPI. This may cause you surprises, but I cannot see why it would cause the bearer enabling to fail. If this problem persists, you should post some more system info about your interfaces, which tipc links you have etc. BR ///jon > -----Original Message----- > From: Mahesh Kumar via tipc-discussion <tipc- > dis...@li...> > Sent: 15-Jul-19 11:49 > To: tip...@li...; Ying Xue > <yin...@wi...> > Subject: Re: [tipc-discussion] TIPC ; config trouble ; help request > > Hi Ying, > Thank you very much for letting me know.Do we suspect any related ioctl() > patches?. could you please point me to link where we can review the TIPC > patches that went in the kernel.?. > Much appreciate the help. > thanks & regardsMahesh kumar.L > On Monday, 15 July, 2019, 02:56:32 am GMT-7, Ying Xue > <yin...@wi...> wrote: > > On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > > Tipc Team, > > > > Greetings!. > > I have been using TIPC for about a year without any issueHowever the > > TIPC tool is bailing out when I tried to set address, bearer > > > > > > / # tipc node set addr 1.1.100 > > > > error: Operation not permitted > > > > / # tipc bearer enable media eth dev ieobc > > > > error: Invalid argument > > > > / # > > > > I am using the new kernel now; > > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 > 15:36:10 > > PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated > > (version 2.0.0)[ 29.436768] tipc: Started in single node mode > > Suspected some TIPC patches integrated through 4.4.180 release introduced > regression. The most simplest method to identify the issue is to revert some > TIPC patches to identify which ones caused the regression. > > > > > [2c3f0b001900_1_RP_0:/] $ tipc-config -nt Type Lower Upper Port Identity Publication Scope 0 16781313 16781313 <1.1.1:0> 16781313 zone 1 1 1 <1.1.1:483390874> 483390875 node 1 88 88 <1.1.1:2870943326> 2870943327 zone 15003 5 5 <1.1.1:3556781096> 3556781097 zone [2c3f0b001900_1_RP_0:/]$ > > > > please let me know if any issue. > > thanks & regardsMahesh kumar.L > > > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li... > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion |
From: Mahesh K. <lm...@ya...> - 2019-07-15 15:49:17
|
Hi Ying, Thank you very much for letting me know.Do we suspect any related ioctl() patches?. could you please point me to link where we can review the TIPC patches that went in the kernel.?. Much appreciate the help. thanks & regardsMahesh kumar.L On Monday, 15 July, 2019, 02:56:32 am GMT-7, Ying Xue <yin...@wi...> wrote: On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > Tipc Team, > > Greetings!. > I have been using TIPC for about a year without any issueHowever the TIPC tool is bailing out when I tried to set address, bearer > > > / # tipc node set addr 1.1.100 > > error: Operation not permitted > > / # tipc bearer enable media eth dev ieobc > > error: Invalid argument > > / # > > I am using the new kernel now; > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 15:36:10 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated (version 2.0.0)[ 29.436768] tipc: Started in single node mode Suspected some TIPC patches integrated through 4.4.180 release introduced regression. The most simplest method to identify the issue is to revert some TIPC patches to identify which ones caused the regression. > > [2c3f0b001900_1_RP_0:/]$ tipc-config -ntType Lower Upper Port Identity Publication Scope0 16781313 16781313 <1.1.1:0> 16781313 zone1 1 1 <1.1.1:483390874> 483390875 node1 88 88 <1.1.1:2870943326> 2870943327 zone15003 5 5 <1.1.1:3556781096> 3556781097 zone[2c3f0b001900_1_RP_0:/]$ > > please let me know if any issue. > thanks & regardsMahesh kumar.L > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > |
From: Ying X. <yin...@wi...> - 2019-07-15 09:56:24
|
On 7/13/19 11:58 AM, Mahesh Kumar via tipc-discussion wrote: > Tipc Team, > > Greetings!. > I have been using TIPC for about a year without any issueHowever the TIPC tool is bailing out when I tried to set address, bearer > > > / # tipc node set addr 1.1.100 > > error: Operation not permitted > > / # tipc bearer enable media eth dev ieobc > > error: Invalid argument > > / # > > I am using the new kernel now; > uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 15:36:10 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux > dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated (version 2.0.0)[ 29.436768] tipc: Started in single node mode Suspected some TIPC patches integrated through 4.4.180 release introduced regression. The most simplest method to identify the issue is to revert some TIPC patches to identify which ones caused the regression. > > [2c3f0b001900_1_RP_0:/]$ tipc-config -ntType Lower Upper Port Identity Publication Scope0 16781313 16781313 <1.1.1:0> 16781313 zone1 1 1 <1.1.1:483390874> 483390875 node1 88 88 <1.1.1:2870943326> 2870943327 zone15003 5 5 <1.1.1:3556781096> 3556781097 zone[2c3f0b001900_1_RP_0:/]$ > > please let me know if any issue. > thanks & regardsMahesh kumar.L > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > |
From: Mahesh K. <lm...@ya...> - 2019-07-13 03:59:07
|
Tipc Team, Greetings!. I have been using TIPC for about a year without any issueHowever the TIPC tool is bailing out when I tried to set address, bearer / # tipc node set addr 1.1.100 error: Operation not permitted / # tipc bearer enable media eth dev ieobc error: Invalid argument / # I am using the new kernel now; uname -aLinux 2c3f0b001900_1_RP_0 4.4.180 #1 SMP Tue Jun 25 15:36:10 PDT 2019 x86_64 x86_64 x86_64 GNU/Linux dmesg output ; grep -i TIPC d.txt[ 29.436599] tipc: Activated (version 2.0.0)[ 29.436768] tipc: Started in single node mode [2c3f0b001900_1_RP_0:/]$ tipc-config -ntType Lower Upper Port Identity Publication Scope0 16781313 16781313 <1.1.1:0> 16781313 zone1 1 1 <1.1.1:483390874> 483390875 node1 88 88 <1.1.1:2870943326> 2870943327 zone15003 5 5 <1.1.1:3556781096> 3556781097 zone[2c3f0b001900_1_RP_0:/]$ please let me know if any issue. thanks & regardsMahesh kumar.L |
From: David M. <da...@da...> - 2019-07-12 22:35:01
|
From: Chris Packham <chr...@al...> Date: Fri, 12 Jul 2019 10:41:15 +1200 > tipc_named_node_up() creates a skb list. It passes the list to > tipc_node_xmit() which has some code paths that can call > skb_queue_purge() which relies on the list->lock being initialised. > > The spin_lock is only needed if the messages end up on the receive path > but when the list is created in tipc_named_node_up() we don't > necessarily know if it is going to end up there. > > Once all the skb list users are updated in tipc it will then be possible > to update them to use the unlocked variants of the skb list functions > and initialise the lock when we know the message will follow the receive > path. > > Signed-off-by: Chris Packham <chr...@al...> Applied. |
From: David M. <da...@da...> - 2019-07-12 17:45:21
|
net-next is closed right now, resubmit this when the tree opens back up. |
From: Jon M. <jon...@er...> - 2019-07-12 13:35:38
|
Acked-by: Jon Maloy <jon...@er...> Actually, this was not what I meant, but it solves our problem in a simple and safe way, for now. Later, when net-next opens, I will revert this and do it the right way, which is to change from skb_queue_purge() to __skb_queue_purge as Eric correctly noted. That change has to be done at four locations, at least, and is too intrusive to post to 'net' now. I'll send a cleanup patch when net-next re-opens. BR ///jon > -----Original Message----- > From: Chris Packham <chr...@al...> > Sent: 11-Jul-19 18:41 > To: Jon Maloy <jon...@er...>; eri...@gm...; > yin...@wi...; da...@da... > Cc: lin...@vg...; ne...@vg...; tipc- > dis...@li...; Chris Packham > <chr...@al...> > Subject: [PATCH v2] tipc: ensure head->lock is initialised > > tipc_named_node_up() creates a skb list. It passes the list to > tipc_node_xmit() which has some code paths that can call > skb_queue_purge() which relies on the list->lock being initialised. > > The spin_lock is only needed if the messages end up on the receive path but > when the list is created in tipc_named_node_up() we don't necessarily know if > it is going to end up there. > > Once all the skb list users are updated in tipc it will then be possible to update > them to use the unlocked variants of the skb list functions and initialise the > lock when we know the message will follow the receive path. > > Signed-off-by: Chris Packham <chr...@al...> > --- > > I'm updating our products to use the latest kernel. One change that we have > that doesn't appear to have been upstreamed is related to the following soft > lockup. > > NMI watchdog: BUG: soft lockup - CPU#3 stuck for 23s! [swapper/3:0] > Modules linked in: tipc jitterentropy_rng echainiv drbg platform_driver(O) > ipifwd(PO) > CPU: 3 PID: 0 Comm: swapper/3 Tainted: P O 4.4.6-at1 #1 > task: a3054e00 ti: ac6b4000 task.ti: a307a000 > NIP: 806891c4 LR: 804f5060 CTR: 804f50d0 > REGS: ac6b59b0 TRAP: 0901 Tainted: P O (4.4.6-at1) > MSR: 00029002 <CE,EE,ME> CR: 84002088 XER: 20000000 > > GPR00: 804f50fc ac6b5a60 a3054e00 00029002 00000101 01001011 > 00000000 00000001 > GPR08: 00021002 c1502d1c ac6b5ae4 00000000 804f50d0 NIP [806891c4] > _raw_spin_lock_irqsave+0x44/0x80 LR [804f5060] skb_dequeue+0x20/0x90 > Call Trace: > [ac6b5a80] [804f50fc] skb_queue_purge+0x2c/0x50 [ac6b5a90] [c1511058] > tipc_node_xmit+0x138/0x170 [tipc] [ac6b5ad0] [c1509e58] > tipc_named_node_up+0x88/0xa0 [tipc] [ac6b5b00] [c150fc1c] > tipc_netlink_compat_stop+0x9bc/0xf50 [tipc] [ac6b5b20] [c1511638] > tipc_rcv+0x418/0x9b0 [tipc] [ac6b5bc0] [c150218c] > tipc_bcast_stop+0xfc/0x7b0 [tipc] [ac6b5bd0] [80504e38] > __netif_receive_skb_core+0x468/0xa10 > [ac6b5c70] [805082fc] netif_receive_skb_internal+0x3c/0xe0 > [ac6b5ca0] [80642a48] br_handle_frame_finish+0x1d8/0x4d0 > [ac6b5d10] [80642f30] br_handle_frame+0x1f0/0x330 [ac6b5d60] > [80504ec8] __netif_receive_skb_core+0x4f8/0xa10 > [ac6b5e00] [805082fc] netif_receive_skb_internal+0x3c/0xe0 > [ac6b5e30] [8044c868] _dpa_rx+0x148/0x5c0 [ac6b5ea0] [8044b0c8] > priv_rx_default_dqrr+0x98/0x170 [ac6b5ed0] [804d1338] > qman_p_poll_dqrr+0x1b8/0x240 [ac6b5f00] [8044b1c0] > dpaa_eth_poll+0x20/0x60 [ac6b5f20] [805087cc] > net_rx_action+0x15c/0x320 [ac6b5f80] [8002594c] > __do_softirq+0x13c/0x250 [ac6b5fe0] [80025c34] irq_exit+0xb4/0xf0 > [ac6b5ff0] [8000d81c] call_do_irq+0x24/0x3c [a307be60] [80004acc] > do_IRQ+0x8c/0x120 [a307be80] [8000f450] ret_from_except+0x0/0x18 > --- interrupt: 501 at arch_cpu_idle+0x24/0x70 > > Eyeballing the code I think it can still happen since tipc_named_node_up > allocates struct sk_buff_head head on the stack so it could have arbitrary > content. > > Changes in v2: > - fixup commit subject > - add more information to commit message from mailing list discussion > > net/tipc/name_distr.c | 2 +- > 1 file changed, 1 insertion(+), 1 deletion(-) > > diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c index > 61219f0b9677..44abc8e9c990 100644 > --- a/net/tipc/name_distr.c > +++ b/net/tipc/name_distr.c > @@ -190,7 +190,7 @@ void tipc_named_node_up(struct net *net, u32 > dnode) > struct name_table *nt = tipc_name_table(net); > struct sk_buff_head head; > > - __skb_queue_head_init(&head); > + skb_queue_head_init(&head); > > read_lock_bh(&nt->cluster_scope_lock); > named_distribute(net, &head, dnode, &nt->cluster_scope); > -- > 2.22.0 |