You can subscribe to this list here.
2003 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(6) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2004 |
Jan
(9) |
Feb
(11) |
Mar
(22) |
Apr
(73) |
May
(78) |
Jun
(146) |
Jul
(80) |
Aug
(27) |
Sep
(5) |
Oct
(14) |
Nov
(18) |
Dec
(27) |
2005 |
Jan
(20) |
Feb
(30) |
Mar
(19) |
Apr
(28) |
May
(50) |
Jun
(31) |
Jul
(32) |
Aug
(14) |
Sep
(36) |
Oct
(43) |
Nov
(74) |
Dec
(63) |
2006 |
Jan
(34) |
Feb
(32) |
Mar
(21) |
Apr
(76) |
May
(106) |
Jun
(72) |
Jul
(70) |
Aug
(175) |
Sep
(130) |
Oct
(39) |
Nov
(81) |
Dec
(43) |
2007 |
Jan
(81) |
Feb
(36) |
Mar
(20) |
Apr
(43) |
May
(54) |
Jun
(34) |
Jul
(44) |
Aug
(55) |
Sep
(44) |
Oct
(54) |
Nov
(43) |
Dec
(41) |
2008 |
Jan
(42) |
Feb
(84) |
Mar
(73) |
Apr
(30) |
May
(119) |
Jun
(54) |
Jul
(54) |
Aug
(93) |
Sep
(173) |
Oct
(130) |
Nov
(145) |
Dec
(153) |
2009 |
Jan
(59) |
Feb
(12) |
Mar
(28) |
Apr
(18) |
May
(56) |
Jun
(9) |
Jul
(28) |
Aug
(62) |
Sep
(16) |
Oct
(19) |
Nov
(15) |
Dec
(17) |
2010 |
Jan
(14) |
Feb
(36) |
Mar
(37) |
Apr
(30) |
May
(33) |
Jun
(53) |
Jul
(42) |
Aug
(50) |
Sep
(67) |
Oct
(66) |
Nov
(69) |
Dec
(36) |
2011 |
Jan
(52) |
Feb
(45) |
Mar
(49) |
Apr
(21) |
May
(34) |
Jun
(13) |
Jul
(19) |
Aug
(37) |
Sep
(43) |
Oct
(10) |
Nov
(23) |
Dec
(30) |
2012 |
Jan
(42) |
Feb
(36) |
Mar
(46) |
Apr
(25) |
May
(96) |
Jun
(146) |
Jul
(40) |
Aug
(28) |
Sep
(61) |
Oct
(45) |
Nov
(100) |
Dec
(53) |
2013 |
Jan
(79) |
Feb
(24) |
Mar
(134) |
Apr
(156) |
May
(118) |
Jun
(75) |
Jul
(278) |
Aug
(145) |
Sep
(136) |
Oct
(168) |
Nov
(137) |
Dec
(439) |
2014 |
Jan
(284) |
Feb
(158) |
Mar
(231) |
Apr
(275) |
May
(259) |
Jun
(91) |
Jul
(222) |
Aug
(215) |
Sep
(165) |
Oct
(166) |
Nov
(211) |
Dec
(150) |
2015 |
Jan
(164) |
Feb
(324) |
Mar
(299) |
Apr
(214) |
May
(111) |
Jun
(109) |
Jul
(105) |
Aug
(36) |
Sep
(58) |
Oct
(131) |
Nov
(68) |
Dec
(30) |
2016 |
Jan
(46) |
Feb
(87) |
Mar
(135) |
Apr
(174) |
May
(132) |
Jun
(135) |
Jul
(149) |
Aug
(125) |
Sep
(79) |
Oct
(49) |
Nov
(95) |
Dec
(102) |
2017 |
Jan
(104) |
Feb
(75) |
Mar
(72) |
Apr
(53) |
May
(18) |
Jun
(5) |
Jul
(14) |
Aug
(19) |
Sep
(2) |
Oct
(13) |
Nov
(21) |
Dec
(67) |
2018 |
Jan
(56) |
Feb
(50) |
Mar
(148) |
Apr
(41) |
May
(37) |
Jun
(34) |
Jul
(34) |
Aug
(11) |
Sep
(52) |
Oct
(48) |
Nov
(28) |
Dec
(46) |
2019 |
Jan
(29) |
Feb
(63) |
Mar
(95) |
Apr
(54) |
May
(14) |
Jun
(71) |
Jul
(60) |
Aug
(49) |
Sep
(3) |
Oct
(64) |
Nov
(115) |
Dec
(57) |
2020 |
Jan
(15) |
Feb
(9) |
Mar
(38) |
Apr
(27) |
May
(60) |
Jun
(53) |
Jul
(35) |
Aug
(46) |
Sep
(37) |
Oct
(64) |
Nov
(20) |
Dec
(25) |
2021 |
Jan
(20) |
Feb
(31) |
Mar
(27) |
Apr
(23) |
May
(21) |
Jun
(30) |
Jul
(30) |
Aug
(7) |
Sep
(18) |
Oct
|
Nov
(15) |
Dec
(4) |
2022 |
Jan
(3) |
Feb
(1) |
Mar
(10) |
Apr
|
May
(2) |
Jun
(26) |
Jul
(5) |
Aug
|
Sep
(1) |
Oct
(2) |
Nov
(9) |
Dec
(2) |
2023 |
Jan
(4) |
Feb
(4) |
Mar
(5) |
Apr
(10) |
May
(29) |
Jun
(17) |
Jul
|
Aug
|
Sep
(1) |
Oct
(1) |
Nov
(2) |
Dec
|
2024 |
Jan
|
Feb
(6) |
Mar
|
Apr
(1) |
May
(6) |
Jun
|
Jul
(5) |
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2025 |
Jan
|
Feb
(3) |
Mar
|
Apr
|
May
|
Jun
|
Jul
(6) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: Tung N. <tun...@de...> - 2019-02-14 06:42:15
|
Fix issue of hung sendto() at user spce. Tung Nguyen (1): tipc: fix race condition causing hung sendto net/tipc/socket.c | 4 ++++ 1 file changed, 4 insertions(+) -- 2.17.1 |
From: Tung N. <tun...@de...> - 2019-02-13 08:18:47
|
Commit 844cf763fba654436d3a4279b6a672c196cf1901 (tipc: make macro tipc_wait_for_cond() smp safe) replaced finish_wait() with remove_wait_queue() but still used prepare_to_wait(). This causes unnecessary conditional checking before adding to wait queue in prepare_to_wait(). This commit replaces prepare_to_wait() with add_wait_queue() as the pair function with remove_wait_queue(). Signed-off-by: Tung Nguyen <tun...@de...> --- net/tipc/socket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 1217c90a363b..81b87916a0eb 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -388,7 +388,7 @@ static int tipc_sk_sock_err(struct socket *sock, long *timeout) rc_ = tipc_sk_sock_err((sock_), timeo_); \ if (rc_) \ break; \ - prepare_to_wait(sk_sleep(sk_), &wait_, TASK_INTERRUPTIBLE); \ + add_wait_queue(sk_sleep(sk_), &wait_); \ release_sock(sk_); \ *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \ sched_annotate_sleep(); \ -- 2.16.2 |
From: Tung N. <tun...@de...> - 2019-02-13 08:18:47
|
This commit replaces schedule_timeout() with wait_woken() in function tipc_wait_for_rcvmsg(). wait_woken() uses memory barriers in its implementation to avoid potential race condition when putting a process into sleeping state and then waking it up. Signed-off-by: Tung Nguyen <tun...@de...> --- net/tipc/socket.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 81b87916a0eb..684f2125fc6b 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1677,7 +1677,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk) static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) { struct sock *sk = sock->sk; - DEFINE_WAIT(wait); + DEFINE_WAIT_FUNC(wait, woken_wake_function); long timeo = *timeop; int err = sock_error(sk); @@ -1685,15 +1685,17 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) return err; for (;;) { - prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); if (timeo && skb_queue_empty(&sk->sk_receive_queue)) { if (sk->sk_shutdown & RCV_SHUTDOWN) { err = -ENOTCONN; break; } + add_wait_queue(sk_sleep(sk), &wait); release_sock(sk); - timeo = schedule_timeout(timeo); + timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo); + sched_annotate_sleep(); lock_sock(sk); + remove_wait_queue(sk_sleep(sk), &wait); } err = 0; if (!skb_queue_empty(&sk->sk_receive_queue)) @@ -1709,7 +1711,6 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) if (err) break; } - finish_wait(sk_sleep(sk), &wait); *timeop = timeo; return err; } -- 2.16.2 |
From: Tung N. <tun...@de...> - 2019-02-13 08:18:47
|
Some improvements for tipc_wait_for_xyz(). Tung Nguyen (2): tipc: improve function tipc_wait_for_cond() tipc: improve function tipc_wait_for_rcvmsg() net/tipc/socket.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) -- 2.16.2 |
From: David M. <da...@da...> - 2019-02-12 05:27:08
|
From: Tuong Lien <tuo...@de...> Date: Mon, 11 Feb 2019 13:29:43 +0700 > When a link endpoint is re-created (e.g. after a node reboot or > interface reset), the link session number is varied by random, the peer > endpoint will be synced with this new session number before the link is > re-established. > > However, there is a shortcoming in this mechanism that can lead to the > link never re-established or faced with a failure then. It happens when > the peer endpoint is ready in ESTABLISHING state, the 'peer_session' as > well as the 'in_session' flag have been set, but suddenly this link > endpoint leaves. When it comes back with a random session number, there > are two situations possible: > > 1/ If the random session number is larger than (or equal to) the > previous one, the peer endpoint will be updated with this new session > upon receipt of a RESET_MSG from this endpoint, and the link can be re- > established as normal. Otherwise, all the RESET_MSGs from this endpoint > will be rejected by the peer. In turn, when this link endpoint receives > one ACTIVATE_MSG from the peer, it will move to ESTABLISHED and start > to send STATE_MSGs, but again these messages will be dropped by the > peer due to wrong session. > The peer link endpoint can still become ESTABLISHED after receiving a > traffic message from this endpoint (e.g. a BCAST_PROTOCOL or > NAME_DISTRIBUTOR), but since all the STATE_MSGs are invalid, the link > will be forced down sooner or later! > > Even in case the random session number is larger than the previous one, > it can be that the ACTIVATE_MSG from the peer arrives first, and this > link endpoint moves quickly to ESTABLISHED without sending out any > RESET_MSG yet. Consequently, the peer link will not be updated with the > new session number, and the same link failure scenario as above will > happen. > > 2/ Another situation can be that, the peer link endpoint was reset due > to any reasons in the meantime, its link state was set to RESET from > ESTABLISHING but still in session, i.e. the 'in_session' flag is not > reset... > Now, if the random session number from this endpoint is less than the > previous one, all the RESET_MSGs from this endpoint will be rejected by > the peer. In the other direction, when this link endpoint receives a > RESET_MSG from the peer, it moves to ESTABLISHING and starts to send > ACTIVATE_MSGs, but all these messages will be rejected by the peer too. > As a result, the link cannot be re-established but gets stuck with this > link endpoint in state ESTABLISHING and the peer in RESET! > > Solution: > =========== > > This link endpoint should not go directly to ESTABLISHED when getting > ACTIVATE_MSG from the peer which may belong to the old session if the > link was re-created. To ensure the session to be correct before the > link is re-established, the peer endpoint in ESTABLISHING state will > send back the last session number in ACTIVATE_MSG for a verification at > this endpoint. Then, if needed, a new and more appropriate session > number will be regenerated to force a re-synch first. > > In addition, when a link in ESTABLISHING state is reset, its state will > move to RESET according to the link FSM, along with resetting the > 'in_session' flag (and the other data) as a normal link reset, it will > also be deleted if requested. > > The solution is backward compatible. > > Acked-by: Jon Maloy <jon...@er...> > Acked-by: Ying Xue <yin...@wi...> > Signed-off-by: Tuong Lien <tuo...@de...> Applied. |
From: David M. <da...@da...> - 2019-02-12 02:36:38
|
From: Hoang Le <hoa...@de...> Date: Mon, 11 Feb 2019 09:18:28 +0700 > When we free skb at tipc_data_input, we return a 'false' boolean. > Then, skb passed to subcalling tipc_link_input in tipc_link_rcv, > > <snip> > 1303 int tipc_link_rcv: > ... > 1354 if (!tipc_data_input(l, skb, l->inputq)) > 1355 rc |= tipc_link_input(l, skb, l->inputq); > </snip> > > Fix it by simple changing to a 'true' boolean when skb is being free-ed. > Then, tipc_link_rcv will bypassed to subcalling tipc_link_input as above > condition. > > Acked-by: Ying Xue <yin...@wi...> > Acked-by: Jon Maloy <ma...@do...> > Signed-off-by: Hoang Le <hoa...@de...> Applied, thanks. |
From: Amit J. <ami...@te...> - 2019-02-11 19:27:51
|
Thanks Jon, Regards, Amit On Fri, 8 Feb 2019, 17:20 Jon Maloy <jon...@er... wrote: > Hi Amit, > Your solution seems ok to me. > > BR > ///jon > > > > -----Original Message----- > > From: Amit Jain <ami...@te...> > > Sent: 8-Feb-19 00:04 > > To: tip...@li... > > Subject: [tipc-discussion] Race condition in link_start and > tipc_link_delete > > function - RHEL 7.3 > > > > Hi , > > > > We are using tipc (version 3.10.0-862 for RHEL 7.3). > > > > Scenario : > > kernel crash in 2 node cluster , no traffic (bearers enabled/disabled) > We are > > able to consistently reproduce the crash on a 2 node setup and > > enabling/disabling bearers in a loop. (On 1 node enable/disable bearers > > within 0.5 seconds , on 2nd node with 4 seconds interval) > > > > Root Cause Analysis of the issue : > > > > Disabling the bearer invokes tipc_link_delete which frees the struct > tipc_link > > *l_ptr. > > This is done under tipc_net_lock, bearer locks > > > > Enabling the bearer, sends discovery messages and on reception from peer > > would invoke tipc_link_create which creates a new l_ptr and associates > with > > the bearer. All this is done with correct locks. However it schedules > link_start > > as a tasklet. > > > > Inside link_start, bearer lock is taken and work is performed (send proto > > msg). So if this tasklet is scheduled before tipc_link_delete , > everything is > > fine. However if the tipc_link_delete happens first then the tasklet is > > invoked with dangling pointer. > > > > Proposed Solution: > > Cancel the tasklet when deleting the link. But since its a work queue > item > > within the tasklet, its not possible to cancel a particular work item. > So within > > link_start, we do sanity check whether l_ptr is valid o r not. if l_ptr > is invalid > > (does not match with l_ptr inside any n_ptr in node list) we simply > return. > > > > Following is the diff which seems to solve the issue, > > > > # git diff > > diff --git a/tipc/link.c b/tipc/link.c > > index 349dbfd..f52dca9 100644 > > --- a/tipc/link.c > > +++ b/tipc/link.c > > @@ -393,9 +393,24 @@ void tipc_link_delete(struct tipc_link *l_ptr) > > > > static void link_start(struct tipc_link *l_ptr) { > > - tipc_node_lock(l_ptr->owner); > > + struct tipc_node *n_ptr; > > + read_lock_bh(&tipc_net_lock); > > + list_for_each_entry(n_ptr, &tipc_node_list, list) { > > + u32 i; > > + tipc_node_lock(n_ptr); > > + for (i = 0; i < MAX_BEARERS; i++) { > > + if (n_ptr->links[i] == l_ptr) { > > + goto LINK_START; > > + } > > + } > > + tipc_node_unlock(n_ptr); > > + } > > + read_unlock_bh(&tipc_net_lock); > > + return; > > +LINK_START: > > link_state_event(l_ptr, STARTING_EVT); > > tipc_node_unlock(l_ptr->owner); > > + read_unlock_bh(&tipc_net_lock); > > } > > > > Would like to know if there are any obvious issues due to above change? > > Appreciate any feedback. > > > > Regards, > > Amit > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li... > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > |
From: Amit J. <ami...@te...> - 2019-02-11 19:10:01
|
Thanks a lot Tuong, for your detailed analysis, feedback and solution. Regards, Amit On Mon, 11 Feb 2019, 12:00 Tuong Lien <tuo...@de... wrote: > When a link endpoint is re-created (e.g. after a node reboot or > interface reset), the link session number is varied by random, the peer > endpoint will be synced with this new session number before the link is > re-established. > > However, there is a shortcoming in this mechanism that can lead to the > link never re-established or faced with a failure then. It happens when > the peer endpoint is ready in ESTABLISHING state, the 'peer_session' as > well as the 'in_session' flag have been set, but suddenly this link > endpoint leaves. When it comes back with a random session number, there > are two situations possible: > > 1/ If the random session number is larger than (or equal to) the > previous one, the peer endpoint will be updated with this new session > upon receipt of a RESET_MSG from this endpoint, and the link can be re- > established as normal. Otherwise, all the RESET_MSGs from this endpoint > will be rejected by the peer. In turn, when this link endpoint receives > one ACTIVATE_MSG from the peer, it will move to ESTABLISHED and start > to send STATE_MSGs, but again these messages will be dropped by the > peer due to wrong session. > The peer link endpoint can still become ESTABLISHED after receiving a > traffic message from this endpoint (e.g. a BCAST_PROTOCOL or > NAME_DISTRIBUTOR), but since all the STATE_MSGs are invalid, the link > will be forced down sooner or later! > > Even in case the random session number is larger than the previous one, > it can be that the ACTIVATE_MSG from the peer arrives first, and this > link endpoint moves quickly to ESTABLISHED without sending out any > RESET_MSG yet. Consequently, the peer link will not be updated with the > new session number, and the same link failure scenario as above will > happen. > > 2/ Another situation can be that, the peer link endpoint was reset due > to any reasons in the meantime, its link state was set to RESET from > ESTABLISHING but still in session, i.e. the 'in_session' flag is not > reset... > Now, if the random session number from this endpoint is less than the > previous one, all the RESET_MSGs from this endpoint will be rejected by > the peer. In the other direction, when this link endpoint receives a > RESET_MSG from the peer, it moves to ESTABLISHING and starts to send > ACTIVATE_MSGs, but all these messages will be rejected by the peer too. > As a result, the link cannot be re-established but gets stuck with this > link endpoint in state ESTABLISHING and the peer in RESET! > > Solution: > =========== > > This link endpoint should not go directly to ESTABLISHED when getting > ACTIVATE_MSG from the peer which may belong to the old session if the > link was re-created. To ensure the session to be correct before the > link is re-established, the peer endpoint in ESTABLISHING state will > send back the last session number in ACTIVATE_MSG for a verification at > this endpoint. Then, if needed, a new and more appropriate session > number will be regenerated to force a re-synch first. > > In addition, when a link in ESTABLISHING state is reset, its state will > move to RESET according to the link FSM, along with resetting the > 'in_session' flag (and the other data) as a normal link reset, it will > also be deleted if requested. > > The solution is backward compatible. > > Acked-by: Jon Maloy <jon...@er...> > Acked-by: Ying Xue <yin...@wi...> > Signed-off-by: Tuong Lien <tuo...@de...> > --- > net/tipc/link.c | 15 +++++++++++++++ > net/tipc/msg.h | 22 ++++++++++++++++++++++ > net/tipc/node.c | 11 ++++++----- > 3 files changed, 43 insertions(+), 5 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c > index ac306d17f8ad..631e21cd4256 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -1425,6 +1425,10 @@ static void tipc_link_build_proto_msg(struct > tipc_link *l, int mtyp, bool probe, > l->rcv_unacked = 0; > } else { > /* RESET_MSG or ACTIVATE_MSG */ > + if (mtyp == ACTIVATE_MSG) { > + msg_set_dest_session_valid(hdr, 1); > + msg_set_dest_session(hdr, l->peer_session); > + } > msg_set_max_pkt(hdr, l->advertised_mtu); > strcpy(data, l->if_name); > msg_set_size(hdr, INT_H_SIZE + TIPC_MAX_IF_NAME); > @@ -1642,6 +1646,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, > struct sk_buff *skb, > rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); > break; > } > + > + /* If this endpoint was re-created while peer was > ESTABLISHING > + * it doesn't know current session number. Force re-synch. > + */ > + if (mtyp == ACTIVATE_MSG && msg_dest_session_valid(hdr) && > + l->session != msg_dest_session(hdr)) { > + if (less(l->session, msg_dest_session(hdr))) > + l->session = msg_dest_session(hdr) + 1; > + break; > + } > + > /* ACTIVATE_MSG serves as PEER_RESET if link is already > down */ > if (mtyp == RESET_MSG || !link_is_up(l)) > rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); > diff --git a/net/tipc/msg.h b/net/tipc/msg.h > index a0924956bb61..d7e4b8b93f9d 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -360,6 +360,28 @@ static inline void msg_set_bcast_ack(struct tipc_msg > *m, u16 n) > msg_set_bits(m, 1, 0, 0xffff, n); > } > > +/* Note: reusing bits in word 1 for ACTIVATE_MSG only, to re-synch > + * link peer session number > + */ > +static inline bool msg_dest_session_valid(struct tipc_msg *m) > +{ > + return msg_bits(m, 1, 16, 0x1); > +} > + > +static inline void msg_set_dest_session_valid(struct tipc_msg *m, bool > valid) > +{ > + msg_set_bits(m, 1, 16, 0x1, valid); > +} > + > +static inline u16 msg_dest_session(struct tipc_msg *m) > +{ > + return msg_bits(m, 1, 0, 0xffff); > +} > + > +static inline void msg_set_dest_session(struct tipc_msg *m, u16 n) > +{ > + msg_set_bits(m, 1, 0, 0xffff, n); > +} > > /* > * Word 2 > diff --git a/net/tipc/node.c b/net/tipc/node.c > index db2a6c3e0be9..2dc4919ab23c 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -830,15 +830,16 @@ static void tipc_node_link_down(struct tipc_node *n, > int bearer_id, bool delete) > tipc_node_write_lock(n); > if (!tipc_link_is_establishing(l)) { > __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); > - if (delete) { > - kfree(l); > - le->link = NULL; > - n->link_cnt--; > - } > } else { > /* Defuse pending tipc_node_link_up() */ > + tipc_link_reset(l); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > } > + if (delete) { > + kfree(l); > + le->link = NULL; > + n->link_cnt--; > + } > trace_tipc_node_link_down(n, true, "node link down or deleted!"); > tipc_node_write_unlock(n); > if (delete) > -- > 2.13.7 > > > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > |
From: Tuong L. <tuo...@de...> - 2019-02-11 06:30:22
|
When a link endpoint is re-created (e.g. after a node reboot or interface reset), the link session number is varied by random, the peer endpoint will be synced with this new session number before the link is re-established. However, there is a shortcoming in this mechanism that can lead to the link never re-established or faced with a failure then. It happens when the peer endpoint is ready in ESTABLISHING state, the 'peer_session' as well as the 'in_session' flag have been set, but suddenly this link endpoint leaves. When it comes back with a random session number, there are two situations possible: 1/ If the random session number is larger than (or equal to) the previous one, the peer endpoint will be updated with this new session upon receipt of a RESET_MSG from this endpoint, and the link can be re- established as normal. Otherwise, all the RESET_MSGs from this endpoint will be rejected by the peer. In turn, when this link endpoint receives one ACTIVATE_MSG from the peer, it will move to ESTABLISHED and start to send STATE_MSGs, but again these messages will be dropped by the peer due to wrong session. The peer link endpoint can still become ESTABLISHED after receiving a traffic message from this endpoint (e.g. a BCAST_PROTOCOL or NAME_DISTRIBUTOR), but since all the STATE_MSGs are invalid, the link will be forced down sooner or later! Even in case the random session number is larger than the previous one, it can be that the ACTIVATE_MSG from the peer arrives first, and this link endpoint moves quickly to ESTABLISHED without sending out any RESET_MSG yet. Consequently, the peer link will not be updated with the new session number, and the same link failure scenario as above will happen. 2/ Another situation can be that, the peer link endpoint was reset due to any reasons in the meantime, its link state was set to RESET from ESTABLISHING but still in session, i.e. the 'in_session' flag is not reset... Now, if the random session number from this endpoint is less than the previous one, all the RESET_MSGs from this endpoint will be rejected by the peer. In the other direction, when this link endpoint receives a RESET_MSG from the peer, it moves to ESTABLISHING and starts to send ACTIVATE_MSGs, but all these messages will be rejected by the peer too. As a result, the link cannot be re-established but gets stuck with this link endpoint in state ESTABLISHING and the peer in RESET! Solution: =========== This link endpoint should not go directly to ESTABLISHED when getting ACTIVATE_MSG from the peer which may belong to the old session if the link was re-created. To ensure the session to be correct before the link is re-established, the peer endpoint in ESTABLISHING state will send back the last session number in ACTIVATE_MSG for a verification at this endpoint. Then, if needed, a new and more appropriate session number will be regenerated to force a re-synch first. In addition, when a link in ESTABLISHING state is reset, its state will move to RESET according to the link FSM, along with resetting the 'in_session' flag (and the other data) as a normal link reset, it will also be deleted if requested. The solution is backward compatible. Acked-by: Jon Maloy <jon...@er...> Acked-by: Ying Xue <yin...@wi...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 15 +++++++++++++++ net/tipc/msg.h | 22 ++++++++++++++++++++++ net/tipc/node.c | 11 ++++++----- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index ac306d17f8ad..631e21cd4256 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1425,6 +1425,10 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, l->rcv_unacked = 0; } else { /* RESET_MSG or ACTIVATE_MSG */ + if (mtyp == ACTIVATE_MSG) { + msg_set_dest_session_valid(hdr, 1); + msg_set_dest_session(hdr, l->peer_session); + } msg_set_max_pkt(hdr, l->advertised_mtu); strcpy(data, l->if_name); msg_set_size(hdr, INT_H_SIZE + TIPC_MAX_IF_NAME); @@ -1642,6 +1646,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); break; } + + /* If this endpoint was re-created while peer was ESTABLISHING + * it doesn't know current session number. Force re-synch. + */ + if (mtyp == ACTIVATE_MSG && msg_dest_session_valid(hdr) && + l->session != msg_dest_session(hdr)) { + if (less(l->session, msg_dest_session(hdr))) + l->session = msg_dest_session(hdr) + 1; + break; + } + /* ACTIVATE_MSG serves as PEER_RESET if link is already down */ if (mtyp == RESET_MSG || !link_is_up(l)) rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index a0924956bb61..d7e4b8b93f9d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -360,6 +360,28 @@ static inline void msg_set_bcast_ack(struct tipc_msg *m, u16 n) msg_set_bits(m, 1, 0, 0xffff, n); } +/* Note: reusing bits in word 1 for ACTIVATE_MSG only, to re-synch + * link peer session number + */ +static inline bool msg_dest_session_valid(struct tipc_msg *m) +{ + return msg_bits(m, 1, 16, 0x1); +} + +static inline void msg_set_dest_session_valid(struct tipc_msg *m, bool valid) +{ + msg_set_bits(m, 1, 16, 0x1, valid); +} + +static inline u16 msg_dest_session(struct tipc_msg *m) +{ + return msg_bits(m, 1, 0, 0xffff); +} + +static inline void msg_set_dest_session(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 1, 0, 0xffff, n); +} /* * Word 2 diff --git a/net/tipc/node.c b/net/tipc/node.c index db2a6c3e0be9..2dc4919ab23c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -830,15 +830,16 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) tipc_node_write_lock(n); if (!tipc_link_is_establishing(l)) { __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); - if (delete) { - kfree(l); - le->link = NULL; - n->link_cnt--; - } } else { /* Defuse pending tipc_node_link_up() */ + tipc_link_reset(l); tipc_link_fsm_evt(l, LINK_RESET_EVT); } + if (delete) { + kfree(l); + le->link = NULL; + n->link_cnt--; + } trace_tipc_node_link_down(n, true, "node link down or deleted!"); tipc_node_write_unlock(n); if (delete) -- 2.13.7 |
From: Hoang Le <hoa...@de...> - 2019-02-11 02:18:48
|
When we free skb at tipc_data_input, we return a 'false' boolean. Then, skb passed to subcalling tipc_link_input in tipc_link_rcv, <snip> 1303 int tipc_link_rcv: ... 1354 if (!tipc_data_input(l, skb, l->inputq)) 1355 rc |= tipc_link_input(l, skb, l->inputq); </snip> Fix it by simple changing to a 'true' boolean when skb is being free-ed. Then, tipc_link_rcv will bypassed to subcalling tipc_link_input as above condition. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <ma...@do...> Signed-off-by: Hoang Le <hoa...@de...> --- net/tipc/link.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index ac306d17f8ad..d31f83a9a2c5 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1145,7 +1145,7 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, default: pr_warn("Dropping received illegal msg type\n"); kfree_skb(skb); - return false; + return true; }; } -- 2.17.1 |
From: Jon M. <jon...@er...> - 2019-02-08 11:50:58
|
Hi Amit, Your solution seems ok to me. BR ///jon > -----Original Message----- > From: Amit Jain <ami...@te...> > Sent: 8-Feb-19 00:04 > To: tip...@li... > Subject: [tipc-discussion] Race condition in link_start and tipc_link_delete > function - RHEL 7.3 > > Hi , > > We are using tipc (version 3.10.0-862 for RHEL 7.3). > > Scenario : > kernel crash in 2 node cluster , no traffic (bearers enabled/disabled) We are > able to consistently reproduce the crash on a 2 node setup and > enabling/disabling bearers in a loop. (On 1 node enable/disable bearers > within 0.5 seconds , on 2nd node with 4 seconds interval) > > Root Cause Analysis of the issue : > > Disabling the bearer invokes tipc_link_delete which frees the struct tipc_link > *l_ptr. > This is done under tipc_net_lock, bearer locks > > Enabling the bearer, sends discovery messages and on reception from peer > would invoke tipc_link_create which creates a new l_ptr and associates with > the bearer. All this is done with correct locks. However it schedules link_start > as a tasklet. > > Inside link_start, bearer lock is taken and work is performed (send proto > msg). So if this tasklet is scheduled before tipc_link_delete , everything is > fine. However if the tipc_link_delete happens first then the tasklet is > invoked with dangling pointer. > > Proposed Solution: > Cancel the tasklet when deleting the link. But since its a work queue item > within the tasklet, its not possible to cancel a particular work item. So within > link_start, we do sanity check whether l_ptr is valid o r not. if l_ptr is invalid > (does not match with l_ptr inside any n_ptr in node list) we simply return. > > Following is the diff which seems to solve the issue, > > # git diff > diff --git a/tipc/link.c b/tipc/link.c > index 349dbfd..f52dca9 100644 > --- a/tipc/link.c > +++ b/tipc/link.c > @@ -393,9 +393,24 @@ void tipc_link_delete(struct tipc_link *l_ptr) > > static void link_start(struct tipc_link *l_ptr) { > - tipc_node_lock(l_ptr->owner); > + struct tipc_node *n_ptr; > + read_lock_bh(&tipc_net_lock); > + list_for_each_entry(n_ptr, &tipc_node_list, list) { > + u32 i; > + tipc_node_lock(n_ptr); > + for (i = 0; i < MAX_BEARERS; i++) { > + if (n_ptr->links[i] == l_ptr) { > + goto LINK_START; > + } > + } > + tipc_node_unlock(n_ptr); > + } > + read_unlock_bh(&tipc_net_lock); > + return; > +LINK_START: > link_state_event(l_ptr, STARTING_EVT); > tipc_node_unlock(l_ptr->owner); > + read_unlock_bh(&tipc_net_lock); > } > > Would like to know if there are any obvious issues due to above change? > Appreciate any feedback. > > Regards, > Amit > > _______________________________________________ > tipc-discussion mailing list > tip...@li... > https://lists.sourceforge.net/lists/listinfo/tipc-discussion |
From: Amit J. <ami...@te...> - 2019-02-08 05:29:38
|
Hi , We are using tipc (version 3.10.0-862 for RHEL 7.3). Scenario : kernel crash in 2 node cluster , no traffic (bearers enabled/disabled) We are able to consistently reproduce the crash on a 2 node setup and enabling/disabling bearers in a loop. (On 1 node enable/disable bearers within 0.5 seconds , on 2nd node with 4 seconds interval) Root Cause Analysis of the issue : Disabling the bearer invokes tipc_link_delete which frees the struct tipc_link *l_ptr. This is done under tipc_net_lock, bearer locks Enabling the bearer, sends discovery messages and on reception from peer would invoke tipc_link_create which creates a new l_ptr and associates with the bearer. All this is done with correct locks. However it schedules link_start as a tasklet. Inside link_start, bearer lock is taken and work is performed (send proto msg). So if this tasklet is scheduled before tipc_link_delete , everything is fine. However if the tipc_link_delete happens first then the tasklet is invoked with dangling pointer. Proposed Solution: Cancel the tasklet when deleting the link. But since its a work queue item within the tasklet, its not possible to cancel a particular work item. So within link_start, we do sanity check whether l_ptr is valid o r not. if l_ptr is invalid (does not match with l_ptr inside any n_ptr in node list) we simply return. Following is the diff which seems to solve the issue, # git diff diff --git a/tipc/link.c b/tipc/link.c index 349dbfd..f52dca9 100644 --- a/tipc/link.c +++ b/tipc/link.c @@ -393,9 +393,24 @@ void tipc_link_delete(struct tipc_link *l_ptr) static void link_start(struct tipc_link *l_ptr) { - tipc_node_lock(l_ptr->owner); + struct tipc_node *n_ptr; + read_lock_bh(&tipc_net_lock); + list_for_each_entry(n_ptr, &tipc_node_list, list) { + u32 i; + tipc_node_lock(n_ptr); + for (i = 0; i < MAX_BEARERS; i++) { + if (n_ptr->links[i] == l_ptr) { + goto LINK_START; + } + } + tipc_node_unlock(n_ptr); + } + read_unlock_bh(&tipc_net_lock); + return; +LINK_START: link_state_event(l_ptr, STARTING_EVT); tipc_node_unlock(l_ptr->owner); + read_unlock_bh(&tipc_net_lock); } Would like to know if there are any obvious issues due to above change? Appreciate any feedback. Regards, Amit |
From: Jon M. <jon...@er...> - 2019-02-01 12:39:01
|
Hi Hoang, I realized one more thing. Since this mechanism only is intended for MCAST messages, we must filter out those at the send side (in tipc_mcast_send_sync()) so that SYN messages are sent only for those. Group messaging, which has its own mechanism, does not need this new method, and cannot discover message duplicates, -it would just deliver them. This must be avoided. BR ///jon > -----Original Message----- > From: Jon Maloy > Sent: 31-Jan-19 16:05 > To: 'Hoang Le' <hoa...@de...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: RE: [net-next] tipc: smooth change between replicast and broadcast > > Hi Hoang, > Nice job, but still a few things to improve. See below. > > > -----Original Message----- > > From: Hoang Le <hoa...@de...> > > Sent: 29-Jan-19 04:22 > > To: Jon Maloy <jon...@er...>; ma...@do...; > > yin...@wi...; tip...@li... > > Subject: [net-next] tipc: smooth change between replicast and > > broadcast > > > > Currently, a multicast stream may start out using replicast, because > > there are few destinations, and then it should ideally switch to > > L2/broadcast IGMP/multicast when the number of destinations grows > > beyond a certain limit. The opposite should happen when the number > > decreases below the limit. > > > > To eliminate the risk of message reordering caused by method change, a > > sending socket must stick to a previously selected method until it > > enters an idle period of 5 seconds. Means there is a 5 seconds pause > > in the traffic from the sender socket. > > > > In this fix, we allow such a switch between replicast and broadcast > > without a > > 5 seconds pause in the traffic. > > > > Solution is to send a dummy message with only the header, also with > > the SYN bit set, via broadcast/replicast. For the data message, the > > SYN bit set and sending via replicast/broadcast. > > > > Then, at receiving side any messages follow first SYN bit message > > (data or dummy), they will be hold in deferred queue until another > > pair (dummy or > > data) arrived. > > > > For compatibility reasons we have to introduce a new capability flag > > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > > dummy message sent out, then poll return empty at old machines. > > > > Signed-off-by: Jon Maloy <jon...@er...> > > Signed-off-by: Hoang Le <hoa...@de...> > > --- > > net/tipc/bcast.c | 116 > > +++++++++++++++++++++++++++++++++++++++++++++- > > net/tipc/bcast.h | 5 ++ > > net/tipc/core.c | 2 + > > net/tipc/core.h | 3 ++ > > net/tipc/msg.h | 10 ++++ > > net/tipc/node.c | 10 ++++ > > net/tipc/node.h | 6 ++- > > net/tipc/socket.c | 10 ++++ > > 8 files changed, 159 insertions(+), 3 deletions(-) > > > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > > d8026543bf4c..e3a85227d4aa 100644 > > --- a/net/tipc/bcast.c > > +++ b/net/tipc/bcast.c > > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > struct tipc_mc_method *method, struct tipc_nlist *dests, > > u16 *cong_link_cnt) > > { > > - struct sk_buff_head inputq, localq; > > + struct sk_buff_head inputq, localq, tmpq; > > + bool rcast = method->rcast; > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > int rc = 0; > > > > skb_queue_head_init(&inputq); > > skb_queue_head_init(&localq); > > + skb_queue_head_init(&tmpq); > > > > /* Clone packets before they are consumed by next call */ > > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > /* Send according to determined transmit method */ > > if (dests->remote) { > > tipc_bcast_select_xmit_method(net, dests->remote, > method); > > I would suggest that you move the whole code block below into a separate > function: > msg_set_is_rcast(hdr, method->rcast); > if (rcast != method->rcast) > tipc_mcast_send_sync(net, skb_peek(pkts)); > > This function sets the SYN bit in the packet header, then copies that header > into a dummy header, inverts the is_rcast bit in that header and sends it out > via the appropriate method. > Note that this involves a small change: the real message is sent out via the > selected method, the dummy message always via the other method, > whichever it is. > > > + > > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > > + skb = skb_peek(pkts); > > + hdr = buf_msg(skb); > > + > > + if (msg_user(hdr) == MSG_FRAGMENTER) > > + hdr = msg_get_wrapped(hdr); > > + if (msg_type(hdr) != TIPC_MCAST_MSG) > > + goto xmit; > > + > > + msg_set_syn(hdr, 0); > > + msg_set_is_rcast(hdr, method->rcast); > > + > > + /* switch mode */ > > + if (rcast != method->rcast) { > > + /* Build message's copied */ > > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > > + GFP_KERNEL); > > + if (!skb) { > > + rc = -ENOMEM; > > + goto exit; > > + } > > + skb_orphan(_skb); > > + skb_copy_to_linear_data(_skb, hdr, > > + MCAST_H_SIZE); > > + > > + /* Build dummy header */ > > + _hdr = buf_msg(_skb); > > + msg_set_size(_hdr, MCAST_H_SIZE); > > + __skb_queue_tail(&tmpq, _skb); > > + > > + msg_set_syn(hdr, 1); > > + msg_set_syn(_hdr, 1); > > + msg_set_is_rcast(_hdr, rcast); > > + /* Prepare for 'synching' */ > > + if (rcast) > > + tipc_rcast_xmit(net, &tmpq, dests, > > + cong_link_cnt); > > + else > > + tipc_bcast_xmit(net, &tmpq, > > + cong_link_cnt); > > + > > + /* This queue should normally be empty by > > now */ > > + __skb_queue_purge(&tmpq); > > + } > > + } > > +xmit: > > if (method->rcast) > > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > > else > > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > > nl->remote = 0; > > nl->local = false; > > } > > + > > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > > + struct sk_buff_head *inputq) > > +{ > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > + u32 node, port, _node, _port; > > + bool match = false; > > If you put in the following lines here we will save some instruction cycles: > hdr = buf_msg(skb_peek(inputq)); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > After all this will be the case in the vast majority of cases. It is safe to just > peek the queue and access, since inputq never can be empty. > > > + > > + skb = __skb_dequeue(inputq); > > + if (!skb) > > + return; > > + > > + hdr = buf_msg(skb); > > + node = msg_orignode(hdr); > > + port = msg_origport(hdr); > > + > > + /* Find a peer port if its existing in defer queue */ > > + while ((_skb = skb_peek(defq))) { > > + _hdr = buf_msg(_skb); > > + _node = msg_orignode(_hdr); > > + _port = msg_origport(_hdr); > > + > > + if (_node != node) > > + continue; > > + if (_port != port) > > + continue; > > + > > + if (!match) { > > + if (msg_is_syn(hdr) && > > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > > + __skb_dequeue(defq); > > + if (msg_data_sz(hdr)) { > > + __skb_queue_tail(inputq, skb); > > + kfree_skb(_skb); > > + } else { > > + __skb_queue_tail(inputq, _skb); > > + kfree_skb(skb); > > + } > > + match = true; > > + } else { > > + break; > > + } > > + } else { > > + if (msg_is_syn(_hdr)) > > + return; > > + /* Dequeued to receive buffer */ > > + __skb_dequeue(defq); > > + __skb_queue_tail(inputq, _skb); > > + } > > + } > > + > > + if (match) > > + return; > > + > > + if (msg_is_syn(hdr)) { > > + /* Enqueue and defer to next synching */ > > + __skb_queue_tail(defq, skb); > > + } else { > > + /* Direct enqueued */ > > + __skb_queue_tail(inputq, skb); > > + } > > +} > > Above function is hard to follow and does not convince me. What if there are > messages from many sources, and you find the match in the middle of the > queue? > I suggest you break down the logics to smaller tasks, e.g., as follows (code > compiles ok, but is untested): > > void tipc_mcast_filter_msg(struct sk_buff_head *defq, > struct sk_buff_head *inputq) { > struct sk_buff *skb, *_skb, *tmp; > struct tipc_msg *hdr, *_hdr; > bool match = false; > u32 node, port; > > skb = skb_peek(inputq); > hdr = buf_msg(skb); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > __skb_dequeue(inputq); > node = msg_orignode(hdr); > port = msg_origport(hdr); > > /* Has the twin SYN message already arrived ? */ > skb_queue_walk(defq, _skb) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > match = true; > break; > } > > if (!match) > return __skb_queue_tail(defq, skb); > > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } > > /* Non-SYN message from other link can be delivered right away */ > if (!msg_is_syn(hdr)) { > if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) > return __skb_queue_tail(inputq, skb); > else > return __skb_queue_tail(defq, skb); > } > > /* Matching SYN messages => return the one with data, if any */ > __skb_unlink(_skb, defq); > if (msg_data_sz(hdr)) { > kfree_skb(_skb); > __skb_queue_tail(inputq, skb); > } else { > kfree_skb(skb); > __skb_queue_tail(inputq, _skb); > } > /* Deliver subsequent non-SYN messages from same peer */ > skb_queue_walk_safe(defq, _skb, tmp) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > if (msg_is_syn(_hdr)) > break; > __skb_unlink(_skb, defq); > __skb_queue_tail(inputq, _skb); > } > } > > > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > > 751530ab0c49..165d88a503e4 100644 > > --- a/net/tipc/bcast.h > > +++ b/net/tipc/bcast.h > > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > > node); > > /* Cookie to be used between socket and broadcast layer > > * @rcast: replicast (instead of broadcast) was used at previous xmit > > * @mandatory: broadcast/replicast indication was set by user > > + * @deferredq: defer queue to make message in order > > * @expires: re-evaluate non-mandatory transmit method if we are past > this > > */ > > [...] > > > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > > net *net, u32 addr, > > tipc_link_update_caps(l, capabilities); > > } > > write_unlock_bh(&n->lock); > > + /* Calculate cluster capabilities */ > > + tn->capabilities = TIPC_NODE_CAPABILITIES; > > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > > + tn->capabilities &= temp_node->capabilities; > > + } > > Yes, you are right here. During a cluster upgrade a node can come back with > new capabilities which also must be reflected in the cluster capabilities field. > Actually, I think it would be a good idea to add cluster capabilities as a > separate patch. This makes this rather complex patch slightly smaller. > > > goto exit; > > } > > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > > break; > > [...] > > > > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > > struct tipc_name_seq *seq, > > &tsk->cong_link_cnt); > > } > > > > + /* Broadcast link is now free to choose method for next broadcast */ > > + if (rc == 0) { > > + method->mandatory = false; > > + method->expires = jiffies; > > + } > > No, we should leave the socket code as is, so we are sure it works with legacy > nodes. > We should instead make tipc_bcast_select_input_method() slightly smarter: > > static void tipc_bcast_select_xmit_method(struct net *net, int dests, > struct tipc_mc_method *method) { > ....... > /* Can current method be changed ? */ > method->expires = jiffies + TIPC_METHOD_EXPIRE; > if (method->mandatory) > return; > if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && > time_before(jiffies, exp)) > return; > > /* Determine method to use now */ > method->rcast = dests <= bb->bc_threshold; } > > I.e., we respect the 'mandatory' setting, because we need that for > group_cast wo work correctly, but we override 'method->expire' if the > cluster capabilities says that all nodes support MCAST_RBCTL. > Combined with the patch where we add forced BCAST or REPLICAST (make > sure you add this patch on top of that one) I think we have a achieved a > pretty smart and adaptive multicast subsystem. > > BR > ///jon > > > > tipc_nlist_purge(&dsts); > > > > return rc ? rc : dlen; > > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > > struct sk_buff *skb, > > if (unlikely(grp)) > > tipc_group_filter_msg(grp, &inputq, xmitq); > > > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > > &inputq); > > + > > /* Validate and add to receive buffer if there is space */ > > while ((skb = __skb_dequeue(&inputq))) { > > hdr = buf_msg(skb); > > -- > > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-02-01 00:10:38
|
Hi Huang, The (!match) case of my suggested code was wrong. To possibly save you some unnecessary troubleshooting I fixed that and made some other simplifications that became possible. ///jon -----Original Message----- From: Jon Maloy Sent: 31-Jan-19 16:05 To: 'Hoang Le' <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li... Subject: RE: [net-next] tipc: smooth change between replicast and broadcast Hi Hoang, Nice job, but still a few things to improve. See below. > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 29-Jan-19 04:22 > To: Jon Maloy <jon...@er...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: [net-next] tipc: smooth change between replicast and [...] void tipc_mcast_filter_msg(struct sk_buff_head *defq, struct sk_buff_head *inputq) { struct sk_buff *skb, *_skb, *tmp; struct tipc_msg *hdr, *_hdr; bool match = false; u32 node, port; skb = skb_peek(inputq); hdr = buf_msg(skb); if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) return; node = msg_orignode(hdr); port = msg_origport(hdr); /* Has the twin SYN message already arrived ? */ skb_queue_walk(defq, _skb) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; match = true; break; } if (!match) { if (!msg_is_syn(hdr)) return; __skb_dequeue(inputq); __skb_queue_tail(defq, skb); return; } if (!msg_is_syn(_hdr)) { pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); __skb_queue_purge(defq); return; } /* Deliver non-SYN message from other link, otherewise queue it */ if (!msg_is_syn(hdr)) { if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) return; __skb_dequeue(inputq); __skb_queue_tail(defq, skb); return; } /* Matching SYN messages => return the one with data, if any */ __skb_unlink(_skb, defq); if (msg_data_sz(hdr)) { kfree_skb(_skb); } else { __skb_dequeue(inputq); kfree_skb(skb); __skb_queue_tail(inputq, _skb); } /* Deliver subsequent non-SYN messages from same peer */ skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; if (msg_is_syn(_hdr)) break; __skb_unlink(_skb, defq); __skb_queue_tail(inputq, _skb); } } > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > 751530ab0c49..165d88a503e4 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > node); > /* Cookie to be used between socket and broadcast layer > * @rcast: replicast (instead of broadcast) was used at previous xmit > * @mandatory: broadcast/replicast indication was set by user > + * @deferredq: defer queue to make message in order > * @expires: re-evaluate non-mandatory transmit method if we are past this > */ [...] > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > net *net, u32 addr, > tipc_link_update_caps(l, capabilities); > } > write_unlock_bh(&n->lock); > + /* Calculate cluster capabilities */ > + tn->capabilities = TIPC_NODE_CAPABILITIES; > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > + tn->capabilities &= temp_node->capabilities; > + } Yes, you are right here. During a cluster upgrade a node can come back with new capabilities which also must be reflected in the cluster capabilities field. Actually, I think it would be a good idea to add cluster capabilities as a separate patch. This makes this rather complex patch slightly smaller. > goto exit; > } > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > break; [...] > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > struct tipc_name_seq *seq, > &tsk->cong_link_cnt); > } > > + /* Broadcast link is now free to choose method for next broadcast */ > + if (rc == 0) { > + method->mandatory = false; > + method->expires = jiffies; > + } No, we should leave the socket code as is, so we are sure it works with legacy nodes. We should instead make tipc_bcast_select_input_method() slightly smarter: static void tipc_bcast_select_xmit_method(struct net *net, int dests, struct tipc_mc_method *method) { ....... /* Can current method be changed ? */ method->expires = jiffies + TIPC_METHOD_EXPIRE; if (method->mandatory) return; if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && time_before(jiffies, exp)) return; /* Determine method to use now */ method->rcast = dests <= bb->bc_threshold; } I.e., we respect the 'mandatory' setting, because we need that for group_cast wo work correctly, but we override 'method->expire' if the cluster capabilities says that all nodes support MCAST_RBCTL. Combined with the patch where we add forced BCAST or REPLICAST (make sure you add this patch on top of that one) I think we have a achieved a pretty smart and adaptive multicast subsystem. BR ///jon > tipc_nlist_purge(&dsts); > > return rc ? rc : dlen; > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > struct sk_buff *skb, > if (unlikely(grp)) > tipc_group_filter_msg(grp, &inputq, xmitq); > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > &inputq); > + > /* Validate and add to receive buffer if there is space */ > while ((skb = __skb_dequeue(&inputq))) { > hdr = buf_msg(skb); > -- > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-01-31 21:05:56
|
Hi Hoang, Nice job, but still a few things to improve. See below. > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 29-Jan-19 04:22 > To: Jon Maloy <jon...@er...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: [net-next] tipc: smooth change between replicast and broadcast > > Currently, a multicast stream may start out using replicast, because there are > few destinations, and then it should ideally switch to L2/broadcast > IGMP/multicast when the number of destinations grows beyond a certain > limit. The opposite should happen when the number decreases below the > limit. > > To eliminate the risk of message reordering caused by method change, a > sending socket must stick to a previously selected method until it enters an > idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from > the sender socket. > > In this fix, we allow such a switch between replicast and broadcast without a > 5 seconds pause in the traffic. > > Solution is to send a dummy message with only the header, also with the SYN > bit set, via broadcast/replicast. For the data message, the SYN bit set and > sending via replicast/broadcast. > > Then, at receiving side any messages follow first SYN bit message (data or > dummy), they will be hold in deferred queue until another pair (dummy or > data) arrived. > > For compatibility reasons we have to introduce a new capability flag > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > dummy message sent out, then poll return empty at old machines. > > Signed-off-by: Jon Maloy <jon...@er...> > Signed-off-by: Hoang Le <hoa...@de...> > --- > net/tipc/bcast.c | 116 > +++++++++++++++++++++++++++++++++++++++++++++- > net/tipc/bcast.h | 5 ++ > net/tipc/core.c | 2 + > net/tipc/core.h | 3 ++ > net/tipc/msg.h | 10 ++++ > net/tipc/node.c | 10 ++++ > net/tipc/node.h | 6 ++- > net/tipc/socket.c | 10 ++++ > 8 files changed, 159 insertions(+), 3 deletions(-) > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > d8026543bf4c..e3a85227d4aa 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > sk_buff_head *pkts, > struct tipc_mc_method *method, struct tipc_nlist *dests, > u16 *cong_link_cnt) > { > - struct sk_buff_head inputq, localq; > + struct sk_buff_head inputq, localq, tmpq; > + bool rcast = method->rcast; > + struct sk_buff *skb, *_skb; > + struct tipc_msg *hdr, *_hdr; > int rc = 0; > > skb_queue_head_init(&inputq); > skb_queue_head_init(&localq); > + skb_queue_head_init(&tmpq); > > /* Clone packets before they are consumed by next call */ > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head > *pkts, > /* Send according to determined transmit method */ > if (dests->remote) { > tipc_bcast_select_xmit_method(net, dests->remote, > method); I would suggest that you move the whole code block below into a separate function: msg_set_is_rcast(hdr, method->rcast); if (rcast != method->rcast) tipc_mcast_send_sync(net, skb_peek(pkts)); This function sets the SYN bit in the packet header, then copies that header into a dummy header, inverts the is_rcast bit in that header and sends it out via the appropriate method. Note that this involves a small change: the real message is sent out via the selected method, the dummy message always via the other method, whichever it is. > + > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > + skb = skb_peek(pkts); > + hdr = buf_msg(skb); > + > + if (msg_user(hdr) == MSG_FRAGMENTER) > + hdr = msg_get_wrapped(hdr); > + if (msg_type(hdr) != TIPC_MCAST_MSG) > + goto xmit; > + > + msg_set_syn(hdr, 0); > + msg_set_is_rcast(hdr, method->rcast); > + > + /* switch mode */ > + if (rcast != method->rcast) { > + /* Build message's copied */ > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > + GFP_KERNEL); > + if (!skb) { > + rc = -ENOMEM; > + goto exit; > + } > + skb_orphan(_skb); > + skb_copy_to_linear_data(_skb, hdr, > + MCAST_H_SIZE); > + > + /* Build dummy header */ > + _hdr = buf_msg(_skb); > + msg_set_size(_hdr, MCAST_H_SIZE); > + __skb_queue_tail(&tmpq, _skb); > + > + msg_set_syn(hdr, 1); > + msg_set_syn(_hdr, 1); > + msg_set_is_rcast(_hdr, rcast); > + /* Prepare for 'synching' */ > + if (rcast) > + tipc_rcast_xmit(net, &tmpq, dests, > + cong_link_cnt); > + else > + tipc_bcast_xmit(net, &tmpq, > + cong_link_cnt); > + > + /* This queue should normally be empty by > now */ > + __skb_queue_purge(&tmpq); > + } > + } > +xmit: > if (method->rcast) > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > else > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > nl->remote = 0; > nl->local = false; > } > + > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > + struct sk_buff_head *inputq) > +{ > + struct sk_buff *skb, *_skb; > + struct tipc_msg *hdr, *_hdr; > + u32 node, port, _node, _port; > + bool match = false; If you put in the following lines here we will save some instruction cycles: hdr = buf_msg(skb_peek(inputq)); if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) return; After all this will be the case in the vast majority of cases. It is safe to just peek the queue and access, since inputq never can be empty. > + > + skb = __skb_dequeue(inputq); > + if (!skb) > + return; > + > + hdr = buf_msg(skb); > + node = msg_orignode(hdr); > + port = msg_origport(hdr); > + > + /* Find a peer port if its existing in defer queue */ > + while ((_skb = skb_peek(defq))) { > + _hdr = buf_msg(_skb); > + _node = msg_orignode(_hdr); > + _port = msg_origport(_hdr); > + > + if (_node != node) > + continue; > + if (_port != port) > + continue; > + > + if (!match) { > + if (msg_is_syn(hdr) && > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > + __skb_dequeue(defq); > + if (msg_data_sz(hdr)) { > + __skb_queue_tail(inputq, skb); > + kfree_skb(_skb); > + } else { > + __skb_queue_tail(inputq, _skb); > + kfree_skb(skb); > + } > + match = true; > + } else { > + break; > + } > + } else { > + if (msg_is_syn(_hdr)) > + return; > + /* Dequeued to receive buffer */ > + __skb_dequeue(defq); > + __skb_queue_tail(inputq, _skb); > + } > + } > + > + if (match) > + return; > + > + if (msg_is_syn(hdr)) { > + /* Enqueue and defer to next synching */ > + __skb_queue_tail(defq, skb); > + } else { > + /* Direct enqueued */ > + __skb_queue_tail(inputq, skb); > + } > +} Above function is hard to follow and does not convince me. What if there are messages from many sources, and you find the match in the middle of the queue? I suggest you break down the logics to smaller tasks, e.g., as follows (code compiles ok, but is untested): void tipc_mcast_filter_msg(struct sk_buff_head *defq, struct sk_buff_head *inputq) { struct sk_buff *skb, *_skb, *tmp; struct tipc_msg *hdr, *_hdr; bool match = false; u32 node, port; skb = skb_peek(inputq); hdr = buf_msg(skb); if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) return; __skb_dequeue(inputq); node = msg_orignode(hdr); port = msg_origport(hdr); /* Has the twin SYN message already arrived ? */ skb_queue_walk(defq, _skb) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; match = true; break; } if (!match) return __skb_queue_tail(defq, skb); if (!msg_is_syn(_hdr)) { pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); __skb_queue_purge(defq); return __skb_queue_tail(inputq, skb); } /* Non-SYN message from other link can be delivered right away */ if (!msg_is_syn(hdr)) { if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) return __skb_queue_tail(inputq, skb); else return __skb_queue_tail(defq, skb); } /* Matching SYN messages => return the one with data, if any */ __skb_unlink(_skb, defq); if (msg_data_sz(hdr)) { kfree_skb(_skb); __skb_queue_tail(inputq, skb); } else { kfree_skb(skb); __skb_queue_tail(inputq, _skb); } /* Deliver subsequent non-SYN messages from same peer */ skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; if (msg_is_syn(_hdr)) break; __skb_unlink(_skb, defq); __skb_queue_tail(inputq, _skb); } } > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > 751530ab0c49..165d88a503e4 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node); > /* Cookie to be used between socket and broadcast layer > * @rcast: replicast (instead of broadcast) was used at previous xmit > * @mandatory: broadcast/replicast indication was set by user > + * @deferredq: defer queue to make message in order > * @expires: re-evaluate non-mandatory transmit method if we are past this > */ [...] > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net > *net, u32 addr, > tipc_link_update_caps(l, capabilities); > } > write_unlock_bh(&n->lock); > + /* Calculate cluster capabilities */ > + tn->capabilities = TIPC_NODE_CAPABILITIES; > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > + tn->capabilities &= temp_node->capabilities; > + } Yes, you are right here. During a cluster upgrade a node can come back with new capabilities which also must be reflected in the cluster capabilities field. Actually, I think it would be a good idea to add cluster capabilities as a separate patch. This makes this rather complex patch slightly smaller. > goto exit; > } > n = kzalloc(sizeof(*n), GFP_ATOMIC); > @@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net > *net, u32 addr, > break; [...] > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, struct > tipc_name_seq *seq, > &tsk->cong_link_cnt); > } > > + /* Broadcast link is now free to choose method for next broadcast */ > + if (rc == 0) { > + method->mandatory = false; > + method->expires = jiffies; > + } No, we should leave the socket code as is, so we are sure it works with legacy nodes. We should instead make tipc_bcast_select_input_method() slightly smarter: static void tipc_bcast_select_xmit_method(struct net *net, int dests, struct tipc_mc_method *method) { ....... /* Can current method be changed ? */ method->expires = jiffies + TIPC_METHOD_EXPIRE; if (method->mandatory) return; if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && time_before(jiffies, exp)) return; /* Determine method to use now */ method->rcast = dests <= bb->bc_threshold; } I.e., we respect the 'mandatory' setting, because we need that for group_cast wo work correctly, but we override 'method->expire' if the cluster capabilities says that all nodes support MCAST_RBCTL. Combined with the patch where we add forced BCAST or REPLICAST (make sure you add this patch on top of that one) I think we have a achieved a pretty smart and adaptive multicast subsystem. BR ///jon > tipc_nlist_purge(&dsts); > > return rc ? rc : dlen; > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct > sk_buff *skb, > if (unlikely(grp)) > tipc_group_filter_msg(grp, &inputq, xmitq); > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > &inputq); > + > /* Validate and add to receive buffer if there is space */ > while ((skb = __skb_dequeue(&inputq))) { > hdr = buf_msg(skb); > -- > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-01-31 00:33:20
|
Hi Hoang, I tried this, but found it a little limiting in comparison to what I had in mind. I suggest we make another iteration on this. First of all, we shouldn't any bearers or links at all, -only the tipc_bc_base structure. We should add three options to the 'tipc link set broadcast' command: - BROADCAST - REPLICAST - SELECTABLE These values have a different meaning from what you have implemented: - BROADCAST forces all multicast traffic to be transmitted via broadcast only, irrespective of cluster size and number of destinations. You achieve this by setting a new field tipc_bc_base::force_bcast to true and the corresponding force_rcast to false. This setting must however be rejected if the corresponding tipc_bc_base::bc_supported is already is false, i.e., one of the bearers doesn't support broadcast. This option is useful when we e.g., want to test the broadcast implementation in a small cluster. -REPLICAST forces all multicast traffic to be transmitted via replicast only, irrespective of cluster size and number of destinations. You achieve this by setting a new field tipc_bc_base::force_rcast to true and the corresponding force_bcast to false. This setting must however be rejected if the corresponding tipc_bc_base::rc_supported is already is false, i.e., one of the destination nodes (old nodes) doesn't support replicast.This option is the one that overrides VXLAN's emulated broadcast setting. - SELECTABLE means that the system can switch between broadcast and replicast depending on cluster size and destination number. Force_bcast and force_rcast are both set to false. This setting must be rejected if one of the fields bc_supported or rc_supported is false. When given this option, an extra option must also be available: 'ratio' which determines the selection threshold. Currently, the rc_ratio is hard coded to 25%, which is probably too high. SELECTABLE should be the default broadcast value (when possible) and the default for 'ratio' should be 10 (meaning 10 %). When we do 'get', we do get the current value, including the ratio when it is relevant (given with the '%' character, to help the user to understand what it is) when the broadcast value is SELECTABLE. The 'link' option is redundant and confusing. This property can only be set on the broadcast link, and should only be listed as a property of that link, not the others. Also, when 'tipc link set broadcast <return>' is typed the help text should print out the three options, so the user gets a clue about what to set. Later, we need to update the man pages and bash command completion script with this. I think this, in combination with your other rcast/bcast patch should give us what we need in all types of environments for the future. Regards ///jon > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 28-Jan-19 21:30 > To: Jon Maloy <jon...@er...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: [net-next] tipc: support multicast/replicast configurable for bearer > > When L2 interface (e.g VXLAN) pseudo support broadcast, we should make it > possible for the user to disable this manually and switch to replicast. > > We should preferably find a way to sense this in the code, but if we cannot > this feature also useful to manual configuration. > > Signed-off-by: Hoang Le <hoa...@de...> > --- > include/uapi/linux/tipc_netlink.h | 3 ++- > net/tipc/bcast.c | 24 ++++++++++++++++++++++-- > net/tipc/bcast.h | 2 ++ > net/tipc/link.c | 6 ++++++ > 4 files changed, 32 insertions(+), 3 deletions(-) > > diff --git a/include/uapi/linux/tipc_netlink.h > b/include/uapi/linux/tipc_netlink.h > index 0ebe02ef1a86..a4a765f69352 100644 > --- a/include/uapi/linux/tipc_netlink.h > +++ b/include/uapi/linux/tipc_netlink.h > @@ -273,7 +273,7 @@ enum { > TIPC_NLA_SOCK_STAT_MAX = __TIPC_NLA_SOCK_STAT_MAX - 1 }; > > -/* Nest, link propreties. Valid for link, media and bearer */ > +/* Nest, link properties. Valid for link, media and bearer */ > enum { > TIPC_NLA_PROP_UNSPEC, > > @@ -281,6 +281,7 @@ enum { > TIPC_NLA_PROP_TOL, /* u32 */ > TIPC_NLA_PROP_WIN, /* u32 */ > TIPC_NLA_PROP_MTU, /* u32 */ > + TIPC_NLA_PROP_BCAST, /* u8 */ > > __TIPC_NLA_PROP_MAX, > TIPC_NLA_PROP_MAX = __TIPC_NLA_PROP_MAX - 1 diff --git > a/net/tipc/bcast.c b/net/tipc/bcast.c index d8026543bf4c..44acd6957b65 > 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -88,6 +88,16 @@ void tipc_bcast_disable_rcast(struct net *net) > tipc_bc_base(net)->rcast_support = false; } > > +bool tipc_bcast_get_bcast(struct net *net) { > + return tipc_bc_base(net)->bcast_support; } > + > +void tipc_bcast_set_bcast(struct net *net, bool sup) { > + tipc_bc_base(net)->bcast_support = sup; } > + > static void tipc_bcbase_calc_bc_threshold(struct net *net) { > struct tipc_bc_base *bb = tipc_bc_base(net); @@ -106,7 +116,6 @@ > static void tipc_bcbase_select_primary(struct net *net) > int i, mtu, prim; > > bb->primary_bearer = INVALID_BEARER_ID; > - bb->bcast_support = true; > > if (!all_dests) > return; > @@ -130,7 +139,7 @@ static void tipc_bcbase_select_primary(struct net > *net) > } > prim = bb->primary_bearer; > if (prim != INVALID_BEARER_ID) > - bb->bcast_support = tipc_bearer_bcast_support(net, prim); > + bb->bcast_support &= tipc_bearer_bcast_support(net, > prim); > } > > void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) @@ - > 498,6 +507,16 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr > *attrs[]) > if (err) > return err; > > + if (props[TIPC_NLA_PROP_BCAST]) { > + tipc_bcast_lock(net); > + if (nla_get_u8(props[TIPC_NLA_PROP_BCAST]) & 0x1) > + tipc_bcast_set_bcast(net, true); > + else > + tipc_bcast_set_bcast(net, false); > + tipc_bcast_unlock(net); > + return 0; > + } > + > if (!props[TIPC_NLA_PROP_WIN]) > return -EOPNOTSUPP; > > @@ -531,6 +550,7 @@ int tipc_bcast_init(struct net *net) > tn->bcl = l; > bb->rc_ratio = 25; > bb->rcast_support = true; > + bb->bcast_support = true; > return 0; > enomem: > kfree(bb); > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > 751530ab0c49..8880dc595c0b 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -91,6 +91,8 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link > *l, int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int > tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int > tipc_bclink_reset_stats(struct net *net); > +void tipc_bcast_set_bcast(struct net *net, bool sup); bool > +tipc_bcast_get_bcast(struct net *net); > > static inline void tipc_bcast_lock(struct net *net) { diff --git a/net/tipc/link.c > b/net/tipc/link.c index ac306d17f8ad..0ac0943b1b00 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -2103,6 +2103,9 @@ int __tipc_nl_add_link(struct net *net, struct > tipc_nl_msg *msg, > goto prop_msg_full; > if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) > goto prop_msg_full; > + if (nla_put_u8(msg->skb, TIPC_NLA_PROP_BCAST, > + tipc_bearer_bcast_support(net, link->bearer_id))) > + goto prop_msg_full; > nla_nest_end(msg->skb, prop); > > err = __tipc_nl_add_stats(msg->skb, &link->stats); @@ -2183,6 > +2186,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg > *msg) > struct nlattr *prop; > struct tipc_net *tn = net_generic(net, tipc_net_id); > struct tipc_link *bcl = tn->bcl; > + bool bcast = tipc_bcast_get_bcast(net); > > if (!bcl) > return 0; > @@ -2218,6 +2222,8 @@ int tipc_nl_add_bc_link(struct net *net, struct > tipc_nl_msg *msg) > goto attr_msg_full; > if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window)) > goto prop_msg_full; > + if (nla_put_u8(msg->skb, TIPC_NLA_PROP_BCAST, bcast)) > + goto prop_msg_full; > nla_nest_end(msg->skb, prop); > > err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); > -- > 2.17.1 |
From: Ying X. <yin...@wi...> - 2019-01-30 14:29:10
|
On 1/30/19 2:45 PM, Hoang Le wrote: > When we free skb at tipc_data_input, we return a 'false' boolean. > Then, skb passed to subcalling tipc_link_input in tipc_link_rcv, > > <snip> > 1303 int tipc_link_rcv: > ... > 1354 if (!tipc_data_input(l, skb, l->inputq)) > 1355 rc |= tipc_link_input(l, skb, l->inputq); > </snip> > > Fix it by simple changing to a 'true' boolean when skb is being free-ed. > Then, tipc_link_rcv will bypassed to subcalling tipc_link_input as above > condition. > > Signed-off-by: Hoang Le <hoa...@de...> Acked-by: Ying Xue <yin...@wi...> > --- > net/tipc/link.c | 2 +- > 1 file changed, 1 insertion(+), 1 deletion(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c > index ac306d17f8ad..d31f83a9a2c5 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -1145,7 +1145,7 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, > default: > pr_warn("Dropping received illegal msg type\n"); > kfree_skb(skb); > - return false; > + return true; > }; > } > > |
From: Ying X. <yin...@wi...> - 2019-01-30 14:29:09
|
On 1/24/19 10:50 AM, Tuong Lien wrote: > When a link endpoint is re-created (e.g. after a node reboot or > interface reset), the link session number is varied by random, the peer > endpoint will be synced with this new session number before the link is > re-established. > > However, there is a shortcoming in this mechanism that can lead to the > link never re-established or faced with a failure then. It happens when > the peer endpoint is ready in ESTABLISHING state, the 'peer_session' as > well as the 'in_session' flag have been set, but suddenly this link > endpoint leaves. When it comes back with a random session number, there > are two situations possible: > > 1/ If the random session number is larger than (or equal to) the > previous one, the peer endpoint will be updated with this new session > upon receipt of a RESET_MSG from this endpoint, and the link can be re- > established as normal. Otherwise, all the RESET_MSGs from this endpoint > will be rejected by the peer. In turn, when this link endpoint receives > one ACTIVATE_MSG from the peer, it will move to ESTABLISHED and start > to send STATE_MSGs, but again these messages will be dropped by the > peer due to wrong session. > The peer link endpoint can still become ESTABLISHED after receiving a > traffic message from this endpoint (e.g. a BCAST_PROTOCOL or > NAME_DISTRIBUTOR), but since all the STATE_MSGs are invalid, the link > will be forced down sooner or later! > > Even in case the random session number is larger than the previous one, > it can be that the ACTIVATE_MSG from the peer arrives first, and this > link endpoint moves quickly to ESTABLISHED without sending out any > RESET_MSG yet. Consequently, the peer link will not be updated with the > new session number, and the same link failure scenario as above will > happen. > > 2/ Another situation can be that, the peer link endpoint was reset due > to any reasons in the meantime, its link state was set to RESET from > ESTABLISHING but still in session, i.e. the 'in_session' flag is not > reset... > Now, if the random session number from this endpoint is less than the > previous one, all the RESET_MSGs from this endpoint will be rejected by > the peer. In the other direction, when this link endpoint receives a > RESET_MSG from the peer, it moves to ESTABLISHING and starts to send > ACTIVATE_MSGs, but all these messages will be rejected by the peer too. > As a result, the link cannot be re-established but gets stuck with this > link endpoint in state ESTABLISHING and the peer in RESET! > > Solution: > =========== > > This link endpoint should not go directly to ESTABLISHED when getting > ACTIVATE_MSG from the peer which may belong to the old session if the > link was re-created. To ensure the session to be correct before the > link is re-established, the peer endpoint in ESTABLISHING state will > send back the last session number in ACTIVATE_MSG for a verification at > this endpoint. Then, if needed, a new and more appropriate session > number will be regenerated to force a re-synch first. > > In addition, when a link in ESTABLISHING state is reset, its state will > move to RESET according to the link FSM, along with resetting the > 'in_session' flag (and the other data) as a normal link reset, it will > also be deleted if requested. > > The solution is backward compatible. > > Signed-off-by: Tuong Lien <tuo...@de...> Acked-by: Ying Xue <yin...@wi...> > --- > net/tipc/link.c | 15 +++++++++++++++ > net/tipc/msg.h | 22 ++++++++++++++++++++++ > net/tipc/node.c | 11 ++++++----- > 3 files changed, 43 insertions(+), 5 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c > index 2792a3cae682..ce3363d8f1c8 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -1425,6 +1425,10 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, > l->rcv_unacked = 0; > } else { > /* RESET_MSG or ACTIVATE_MSG */ > + if (mtyp == ACTIVATE_MSG) { > + msg_set_dest_session_valid(hdr, 1); > + msg_set_dest_session(hdr, l->peer_session); > + } > msg_set_max_pkt(hdr, l->advertised_mtu); > strcpy(data, l->if_name); > msg_set_size(hdr, INT_H_SIZE + TIPC_MAX_IF_NAME); > @@ -1642,6 +1646,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, > rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); > break; > } > + > + /* If this endpoint was re-created while peer was ESTABLISHING > + * it doesn't know current session number. Force re-synch. > + */ > + if (mtyp == ACTIVATE_MSG && msg_dest_session_valid(hdr) && > + l->session != msg_dest_session(hdr)) { > + if (less(l->session, msg_dest_session(hdr))) > + l->session = msg_dest_session(hdr) + 1; > + break; > + } > + > /* ACTIVATE_MSG serves as PEER_RESET if link is already down */ > if (mtyp == RESET_MSG || !link_is_up(l)) > rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); > diff --git a/net/tipc/msg.h b/net/tipc/msg.h > index a0924956bb61..d7e4b8b93f9d 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -360,6 +360,28 @@ static inline void msg_set_bcast_ack(struct tipc_msg *m, u16 n) > msg_set_bits(m, 1, 0, 0xffff, n); > } > > +/* Note: reusing bits in word 1 for ACTIVATE_MSG only, to re-synch > + * link peer session number > + */ > +static inline bool msg_dest_session_valid(struct tipc_msg *m) > +{ > + return msg_bits(m, 1, 16, 0x1); > +} > + > +static inline void msg_set_dest_session_valid(struct tipc_msg *m, bool valid) > +{ > + msg_set_bits(m, 1, 16, 0x1, valid); > +} > + > +static inline u16 msg_dest_session(struct tipc_msg *m) > +{ > + return msg_bits(m, 1, 0, 0xffff); > +} > + > +static inline void msg_set_dest_session(struct tipc_msg *m, u16 n) > +{ > + msg_set_bits(m, 1, 0, 0xffff, n); > +} > > /* > * Word 2 > diff --git a/net/tipc/node.c b/net/tipc/node.c > index db2a6c3e0be9..2dc4919ab23c 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -830,15 +830,16 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) > tipc_node_write_lock(n); > if (!tipc_link_is_establishing(l)) { > __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); > - if (delete) { > - kfree(l); > - le->link = NULL; > - n->link_cnt--; > - } > } else { > /* Defuse pending tipc_node_link_up() */ > + tipc_link_reset(l); > tipc_link_fsm_evt(l, LINK_RESET_EVT); > } > + if (delete) { > + kfree(l); > + le->link = NULL; > + n->link_cnt--; > + } > trace_tipc_node_link_down(n, true, "node link down or deleted!"); > tipc_node_write_unlock(n); > if (delete) > |
From: Jon M. <ma...@do...> - 2019-01-30 12:54:05
|
Acked-by: Jon On Wednesday, January 30, 2019, 1:45:29 AM EST, Hoang Le <hoa...@de...> wrote: When we free skb at tipc_data_input, we return a 'false' boolean. Then, skb passed to subcalling tipc_link_input in tipc_link_rcv, <snip> 1303 int tipc_link_rcv: ... 1354 if (!tipc_data_input(l, skb, l->inputq)) 1355 rc |= tipc_link_input(l, skb, l->inputq); </snip> Fix it by simple changing to a 'true' boolean when skb is being free-ed. Then, tipc_link_rcv will bypassed to subcalling tipc_link_input as above condition. Signed-off-by: Hoang Le <hoa...@de...> --- net/tipc/link.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index ac306d17f8ad..d31f83a9a2c5 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1145,7 +1145,7 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, default: pr_warn("Dropping received illegal msg type\n"); kfree_skb(skb); - return false; + return true; }; } -- 2.17.1 |
From: Hoang Le <hoa...@de...> - 2019-01-30 06:45:32
|
When we free skb at tipc_data_input, we return a 'false' boolean. Then, skb passed to subcalling tipc_link_input in tipc_link_rcv, <snip> 1303 int tipc_link_rcv: ... 1354 if (!tipc_data_input(l, skb, l->inputq)) 1355 rc |= tipc_link_input(l, skb, l->inputq); </snip> Fix it by simple changing to a 'true' boolean when skb is being free-ed. Then, tipc_link_rcv will bypassed to subcalling tipc_link_input as above condition. Signed-off-by: Hoang Le <hoa...@de...> --- net/tipc/link.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index ac306d17f8ad..d31f83a9a2c5 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1145,7 +1145,7 @@ static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, default: pr_warn("Dropping received illegal msg type\n"); kfree_skb(skb); - return false; + return true; }; } -- 2.17.1 |
From: Hoang Le <hoa...@de...> - 2019-01-29 09:38:52
|
When L2 interface (e.g VXLAN) pseudo support broadcast, we should make it possible for the user to disable this manually and switch to replicast. Usage: - Enable broadcast for bearer: $tipc link set broadcast 1 link broadcast-link - Disable broadcast (means replicast used) for bearer: $tipc link set broadcast 0 link broadcast-link Signed-off-by: Hoang Le <hoa...@de...> --- include/uapi/linux/tipc_netlink.h | 3 ++- tipc/link.c | 40 +++++++++++++++++++++++++++---- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/include/uapi/linux/tipc_netlink.h b/include/uapi/linux/tipc_netlink.h index 0ebe02ef1a86..a4a765f69352 100644 --- a/include/uapi/linux/tipc_netlink.h +++ b/include/uapi/linux/tipc_netlink.h @@ -273,7 +273,7 @@ enum { TIPC_NLA_SOCK_STAT_MAX = __TIPC_NLA_SOCK_STAT_MAX - 1 }; -/* Nest, link propreties. Valid for link, media and bearer */ +/* Nest, link properties. Valid for link, media and bearer */ enum { TIPC_NLA_PROP_UNSPEC, @@ -281,6 +281,7 @@ enum { TIPC_NLA_PROP_TOL, /* u32 */ TIPC_NLA_PROP_WIN, /* u32 */ TIPC_NLA_PROP_MTU, /* u32 */ + TIPC_NLA_PROP_BCAST, /* u8 */ __TIPC_NLA_PROP_MAX, TIPC_NLA_PROP_MAX = __TIPC_NLA_PROP_MAX - 1 diff --git a/tipc/link.c b/tipc/link.c index 43e26da3fa6b..45f2070d5a6d 100644 --- a/tipc/link.c +++ b/tipc/link.c @@ -28,6 +28,7 @@ #define PRIORITY_STR "priority" #define TOLERANCE_STR "tolerance" #define WINDOW_STR "window" +#define BCAST_STR "broadcast" static int link_list_cb(const struct nlmsghdr *nlh, void *data) { @@ -111,6 +112,11 @@ static int link_get_cb(const struct nlmsghdr *nlh, void *data) case TIPC_NLA_PROP_WIN: print_uint(PRINT_ANY, WINDOW_STR, "%u\n", mnl_attr_get_u32(props[*prop])); break; + case TIPC_NLA_PROP_BCAST: + print_string(PRINT_ANY, NULL, "%s\n", + mnl_attr_get_u32(props[*prop]) ? + "BROADCAST" : "REPLICAST"); + break; default: break; } @@ -137,6 +143,8 @@ static int cmd_link_get_prop(struct nlmsghdr *nlh, const struct cmd *cmd, prop = TIPC_NLA_PROP_TOL; else if ((strcmp(cmd->cmd, WINDOW_STR) == 0)) prop = TIPC_NLA_PROP_WIN; + else if ((strcmp(cmd->cmd, BCAST_STR) == 0)) + prop = TIPC_NLA_PROP_BCAST; else return -EINVAL; @@ -172,7 +180,8 @@ static void cmd_link_get_help(struct cmdl *cmdl) "PROPERTIES\n" " tolerance - Get link tolerance\n" " priority - Get link priority\n" - " window - Get link window\n", + " window - Get link window\n" + " broadcast - Get link broadcast supporting\n", cmdl->argv[0]); } @@ -183,6 +192,7 @@ static int cmd_link_get(struct nlmsghdr *nlh, const struct cmd *cmd, { PRIORITY_STR, cmd_link_get_prop, cmd_link_get_help }, { TOLERANCE_STR, cmd_link_get_prop, cmd_link_get_help }, { WINDOW_STR, cmd_link_get_prop, cmd_link_get_help }, + { BCAST_STR, cmd_link_get_prop, cmd_link_get_help }, { NULL } }; @@ -249,6 +259,14 @@ static int _show_link_stat(const char *name, struct nlattr *attrs[], open_json_object(NULL); print_string(PRINT_ANY, "link", "\nLink <%s>\n", name); + print_string(PRINT_JSON, "flag", "", NULL); + open_json_array(PRINT_JSON, NULL); + if (mnl_attr_get_u8(prop[TIPC_NLA_PROP_BCAST])) + print_string(PRINT_ANY, NULL, " %s", "BROADCAST"); + else + print_string(PRINT_ANY, NULL, " %s", "REPLICAST"); + close_json_array(PRINT_JSON, NULL); + print_string(PRINT_JSON, "state", "", NULL); open_json_array(PRINT_JSON, NULL); if (attrs[TIPC_NLA_LINK_ACTIVE]) @@ -355,10 +373,17 @@ static int _show_link_stat(const char *name, struct nlattr *attrs[], } static int _show_bc_link_stat(const char *name, struct nlattr *prop[], - struct nlattr *stats[]) + struct nlattr *stats[]) { open_json_object(NULL); print_string(PRINT_ANY, "link", "Link <%s>\n", name); + print_string(PRINT_JSON, "flag", "", NULL); + open_json_array(PRINT_JSON, NULL); + if (mnl_attr_get_u8(prop[TIPC_NLA_PROP_BCAST])) + print_string(PRINT_ANY, NULL, " %s", "BROADCAST"); + else + print_string(PRINT_ANY, NULL, " %s", "REPLICAST"); + close_json_array(PRINT_JSON, NULL); print_uint(PRINT_ANY, WINDOW_STR, " Window:%u packets\n", mnl_attr_get_u32(prop[TIPC_NLA_PROP_WIN])); @@ -521,7 +546,8 @@ static void cmd_link_set_help(struct cmdl *cmdl) "PROPERTIES\n" " tolerance TOLERANCE - Set link tolerance\n" " priority PRIORITY - Set link priority\n" - " window WINDOW - Set link window\n", + " window WINDOW - Set link window\n" + " broadcast [0|1] - Set <broadcast-link> [replicast|broadcast] supporting\n", cmdl->argv[0]); } @@ -545,6 +571,8 @@ static int cmd_link_set_prop(struct nlmsghdr *nlh, const struct cmd *cmd, prop = TIPC_NLA_PROP_TOL; else if ((strcmp(cmd->cmd, WINDOW_STR) == 0)) prop = TIPC_NLA_PROP_WIN; + else if ((strcmp(cmd->cmd, BCAST_STR) == 0)) + prop = TIPC_NLA_PROP_BCAST; else return -EINVAL; @@ -577,7 +605,10 @@ static int cmd_link_set_prop(struct nlmsghdr *nlh, const struct cmd *cmd, mnl_attr_put_strz(nlh, TIPC_NLA_LINK_NAME, opt->val); props = mnl_attr_nest_start(nlh, TIPC_NLA_LINK_PROP); - mnl_attr_put_u32(nlh, prop, val); + if (prop == TIPC_NLA_PROP_BCAST) + mnl_attr_put_u8(nlh, prop, val); + else + mnl_attr_put_u32(nlh, prop, val); mnl_attr_nest_end(nlh, props); mnl_attr_nest_end(nlh, attrs); @@ -592,6 +623,7 @@ static int cmd_link_set(struct nlmsghdr *nlh, const struct cmd *cmd, { PRIORITY_STR, cmd_link_set_prop, cmd_link_set_help }, { TOLERANCE_STR, cmd_link_set_prop, cmd_link_set_help }, { WINDOW_STR, cmd_link_set_prop, cmd_link_set_help }, + { BCAST_STR, cmd_link_set_prop, cmd_link_set_help }, { NULL } }; -- 2.17.1 |
From: Hoang Le <hoa...@de...> - 2019-01-29 09:22:35
|
Currently, a multicast stream may start out using replicast, because there are few destinations, and then it should ideally switch to L2/broadcast IGMP/multicast when the number of destinations grows beyond a certain limit. The opposite should happen when the number decreases below the limit. To eliminate the risk of message reordering caused by method change, a sending socket must stick to a previously selected method until it enters an idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from the sender socket. In this fix, we allow such a switch between replicast and broadcast without a 5 seconds pause in the traffic. Solution is to send a dummy message with only the header, also with the SYN bit set, via broadcast/replicast. For the data message, the SYN bit set and sending via replicast/broadcast. Then, at receiving side any messages follow first SYN bit message (data or dummy), they will be hold in deferred queue until another pair (dummy or data) arrived. For compatibility reasons we have to introduce a new capability flag TIPC_MCAST_RBCTL to handle this new feature. Because of there is a dummy message sent out, then poll return empty at old machines. Signed-off-by: Jon Maloy <jon...@er...> Signed-off-by: Hoang Le <hoa...@de...> --- net/tipc/bcast.c | 116 +++++++++++++++++++++++++++++++++++++++++++++- net/tipc/bcast.h | 5 ++ net/tipc/core.c | 2 + net/tipc/core.h | 3 ++ net/tipc/msg.h | 10 ++++ net/tipc/node.c | 10 ++++ net/tipc/node.h | 6 ++- net/tipc/socket.c | 10 ++++ 8 files changed, 159 insertions(+), 3 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index d8026543bf4c..e3a85227d4aa 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, struct tipc_mc_method *method, struct tipc_nlist *dests, u16 *cong_link_cnt) { - struct sk_buff_head inputq, localq; + struct sk_buff_head inputq, localq, tmpq; + bool rcast = method->rcast; + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr, *_hdr; int rc = 0; skb_queue_head_init(&inputq); skb_queue_head_init(&localq); + skb_queue_head_init(&tmpq); /* Clone packets before they are consumed by next call */ if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ -309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, /* Send according to determined transmit method */ if (dests->remote) { tipc_bcast_select_xmit_method(net, dests->remote, method); + + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { + skb = skb_peek(pkts); + hdr = buf_msg(skb); + + if (msg_user(hdr) == MSG_FRAGMENTER) + hdr = msg_get_wrapped(hdr); + if (msg_type(hdr) != TIPC_MCAST_MSG) + goto xmit; + + msg_set_syn(hdr, 0); + msg_set_is_rcast(hdr, method->rcast); + + /* switch mode */ + if (rcast != method->rcast) { + /* Build message's copied */ + _skb = tipc_buf_acquire(MCAST_H_SIZE, + GFP_KERNEL); + if (!skb) { + rc = -ENOMEM; + goto exit; + } + skb_orphan(_skb); + skb_copy_to_linear_data(_skb, hdr, + MCAST_H_SIZE); + + /* Build dummy header */ + _hdr = buf_msg(_skb); + msg_set_size(_hdr, MCAST_H_SIZE); + __skb_queue_tail(&tmpq, _skb); + + msg_set_syn(hdr, 1); + msg_set_syn(_hdr, 1); + msg_set_is_rcast(_hdr, rcast); + /* Prepare for 'synching' */ + if (rcast) + tipc_rcast_xmit(net, &tmpq, dests, + cong_link_cnt); + else + tipc_bcast_xmit(net, &tmpq, + cong_link_cnt); + + /* This queue should normally be empty by now */ + __skb_queue_purge(&tmpq); + } + } +xmit: if (method->rcast) rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); else @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) nl->remote = 0; nl->local = false; } + +void tipc_mcast_filter_msg(struct sk_buff_head *defq, + struct sk_buff_head *inputq) +{ + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr, *_hdr; + u32 node, port, _node, _port; + bool match = false; + + skb = __skb_dequeue(inputq); + if (!skb) + return; + + hdr = buf_msg(skb); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + /* Find a peer port if its existing in defer queue */ + while ((_skb = skb_peek(defq))) { + _hdr = buf_msg(_skb); + _node = msg_orignode(_hdr); + _port = msg_origport(_hdr); + + if (_node != node) + continue; + if (_port != port) + continue; + + if (!match) { + if (msg_is_syn(hdr) && + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { + __skb_dequeue(defq); + if (msg_data_sz(hdr)) { + __skb_queue_tail(inputq, skb); + kfree_skb(_skb); + } else { + __skb_queue_tail(inputq, _skb); + kfree_skb(skb); + } + match = true; + } else { + break; + } + } else { + if (msg_is_syn(_hdr)) + return; + /* Dequeued to receive buffer */ + __skb_dequeue(defq); + __skb_queue_tail(inputq, _skb); + } + } + + if (match) + return; + + if (msg_is_syn(hdr)) { + /* Enqueue and defer to next synching */ + __skb_queue_tail(defq, skb); + } else { + /* Direct enqueued */ + __skb_queue_tail(inputq, skb); + } +} diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 751530ab0c49..165d88a503e4 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node); /* Cookie to be used between socket and broadcast layer * @rcast: replicast (instead of broadcast) was used at previous xmit * @mandatory: broadcast/replicast indication was set by user + * @deferredq: defer queue to make message in order * @expires: re-evaluate non-mandatory transmit method if we are past this */ struct tipc_mc_method { bool rcast; bool mandatory; + struct sk_buff_head deferredq; unsigned long expires; }; @@ -92,6 +94,9 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_bclink_reset_stats(struct net *net); +void tipc_mcast_filter_msg(struct sk_buff_head *defq, + struct sk_buff_head *inputq); + static inline void tipc_bcast_lock(struct net *net) { spin_lock_bh(&tipc_net(net)->bclock); diff --git a/net/tipc/core.c b/net/tipc/core.c index 5b38f5164281..27cccd101ef6 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -43,6 +43,7 @@ #include "net.h" #include "socket.h" #include "bcast.h" +#include "node.h" #include <linux/module.h> @@ -59,6 +60,7 @@ static int __net_init tipc_init_net(struct net *net) tn->node_addr = 0; tn->trial_addr = 0; tn->addr_trial_end = 0; + tn->capabilities = TIPC_NODE_CAPABILITIES; memset(tn->node_id, 0, sizeof(tn->node_id)); memset(tn->node_id_string, 0, sizeof(tn->node_id_string)); tn->mon_threshold = TIPC_DEF_MON_THRESHOLD; diff --git a/net/tipc/core.h b/net/tipc/core.h index 8020a6c360ff..7a68e1b6a066 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -122,6 +122,9 @@ struct tipc_net { /* Topology subscription server */ struct tipc_topsrv *topsrv; atomic_t subscription_count; + + /* Cluster capabilities */ + u16 capabilities; }; static inline struct tipc_net *tipc_net(struct net *net) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index a0924956bb61..70ddff2206a0 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d) msg_set_bits(m, 0, 18, 1, d); } +static inline bool msg_is_rcast(struct tipc_msg *m) +{ + return msg_bits(m, 0, 18, 0x1); +} + +static inline void msg_set_is_rcast(struct tipc_msg *m, bool d) +{ + msg_set_bits(m, 0, 18, 0x1, d); +} + static inline void msg_set_size(struct tipc_msg *m, u32 sz) { m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); diff --git a/net/tipc/node.c b/net/tipc/node.c index db2a6c3e0be9..1386e44d965c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr, tipc_link_update_caps(l, capabilities); } write_unlock_bh(&n->lock); + /* Calculate cluster capabilities */ + tn->capabilities = TIPC_NODE_CAPABILITIES; + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { + tn->capabilities &= temp_node->capabilities; + } goto exit; } n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr, break; } list_add_tail_rcu(&n->list, &temp_node->list); + /* Calculate cluster capabilities */ + tn->capabilities = TIPC_NODE_CAPABILITIES; + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { + tn->capabilities &= temp_node->capabilities; + } trace_tipc_node_create(n, true, " "); exit: spin_unlock_bh(&tn->node_list_lock); diff --git a/net/tipc/node.h b/net/tipc/node.h index 4f59a30e989a..2404225c5d58 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -51,7 +51,8 @@ enum { TIPC_BLOCK_FLOWCTL = (1 << 3), TIPC_BCAST_RCAST = (1 << 4), TIPC_NODE_ID128 = (1 << 5), - TIPC_LINK_PROTO_SEQNO = (1 << 6) + TIPC_LINK_PROTO_SEQNO = (1 << 6), + TIPC_MCAST_RBCTL = (1 << 7) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -60,7 +61,8 @@ enum { TIPC_BCAST_RCAST | \ TIPC_BLOCK_FLOWCTL | \ TIPC_NODE_ID128 | \ - TIPC_LINK_PROTO_SEQNO) + TIPC_LINK_PROTO_SEQNO | \ + TIPC_MCAST_RBCTL) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 8fc5acd4820d..53a8113375a6 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -483,6 +483,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk_set_unreturnable(tsk, true); if (sock->type == SOCK_DGRAM) tsk_set_unreliable(tsk, true); + __skb_queue_head_init(&tsk->mc_method.deferredq); } trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); @@ -580,6 +581,7 @@ static int tipc_release(struct socket *sock) sk->sk_shutdown = SHUTDOWN_MASK; tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); + __skb_queue_purge(&tsk->mc_method.deferredq); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, &tsk->cong_link_cnt); } + /* Broadcast link is now free to choose method for next broadcast */ + if (rc == 0) { + method->mandatory = false; + method->expires = jiffies; + } tipc_nlist_purge(&dsts); return rc ? rc : dlen; @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(grp)) tipc_group_filter_msg(grp, &inputq, xmitq); + if (msg_type(hdr) == TIPC_MCAST_MSG) + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); -- 2.17.1 |
From: Hoang Le <hoa...@de...> - 2019-01-29 02:30:02
|
When L2 interface (e.g VXLAN) pseudo support broadcast, we should make it possible for the user to disable this manually and switch to replicast. We should preferably find a way to sense this in the code, but if we cannot this feature also useful to manual configuration. Signed-off-by: Hoang Le <hoa...@de...> --- include/uapi/linux/tipc_netlink.h | 3 ++- net/tipc/bcast.c | 24 ++++++++++++++++++++++-- net/tipc/bcast.h | 2 ++ net/tipc/link.c | 6 ++++++ 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/include/uapi/linux/tipc_netlink.h b/include/uapi/linux/tipc_netlink.h index 0ebe02ef1a86..a4a765f69352 100644 --- a/include/uapi/linux/tipc_netlink.h +++ b/include/uapi/linux/tipc_netlink.h @@ -273,7 +273,7 @@ enum { TIPC_NLA_SOCK_STAT_MAX = __TIPC_NLA_SOCK_STAT_MAX - 1 }; -/* Nest, link propreties. Valid for link, media and bearer */ +/* Nest, link properties. Valid for link, media and bearer */ enum { TIPC_NLA_PROP_UNSPEC, @@ -281,6 +281,7 @@ enum { TIPC_NLA_PROP_TOL, /* u32 */ TIPC_NLA_PROP_WIN, /* u32 */ TIPC_NLA_PROP_MTU, /* u32 */ + TIPC_NLA_PROP_BCAST, /* u8 */ __TIPC_NLA_PROP_MAX, TIPC_NLA_PROP_MAX = __TIPC_NLA_PROP_MAX - 1 diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index d8026543bf4c..44acd6957b65 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -88,6 +88,16 @@ void tipc_bcast_disable_rcast(struct net *net) tipc_bc_base(net)->rcast_support = false; } +bool tipc_bcast_get_bcast(struct net *net) +{ + return tipc_bc_base(net)->bcast_support; +} + +void tipc_bcast_set_bcast(struct net *net, bool sup) +{ + tipc_bc_base(net)->bcast_support = sup; +} + static void tipc_bcbase_calc_bc_threshold(struct net *net) { struct tipc_bc_base *bb = tipc_bc_base(net); @@ -106,7 +116,6 @@ static void tipc_bcbase_select_primary(struct net *net) int i, mtu, prim; bb->primary_bearer = INVALID_BEARER_ID; - bb->bcast_support = true; if (!all_dests) return; @@ -130,7 +139,7 @@ static void tipc_bcbase_select_primary(struct net *net) } prim = bb->primary_bearer; if (prim != INVALID_BEARER_ID) - bb->bcast_support = tipc_bearer_bcast_support(net, prim); + bb->bcast_support &= tipc_bearer_bcast_support(net, prim); } void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id) @@ -498,6 +507,16 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]) if (err) return err; + if (props[TIPC_NLA_PROP_BCAST]) { + tipc_bcast_lock(net); + if (nla_get_u8(props[TIPC_NLA_PROP_BCAST]) & 0x1) + tipc_bcast_set_bcast(net, true); + else + tipc_bcast_set_bcast(net, false); + tipc_bcast_unlock(net); + return 0; + } + if (!props[TIPC_NLA_PROP_WIN]) return -EOPNOTSUPP; @@ -531,6 +550,7 @@ int tipc_bcast_init(struct net *net) tn->bcl = l; bb->rc_ratio = 25; bb->rcast_support = true; + bb->bcast_support = true; return 0; enomem: kfree(bb); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 751530ab0c49..8880dc595c0b 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -91,6 +91,8 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_bclink_reset_stats(struct net *net); +void tipc_bcast_set_bcast(struct net *net, bool sup); +bool tipc_bcast_get_bcast(struct net *net); static inline void tipc_bcast_lock(struct net *net) { diff --git a/net/tipc/link.c b/net/tipc/link.c index ac306d17f8ad..0ac0943b1b00 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -2103,6 +2103,9 @@ int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, goto prop_msg_full; if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority)) goto prop_msg_full; + if (nla_put_u8(msg->skb, TIPC_NLA_PROP_BCAST, + tipc_bearer_bcast_support(net, link->bearer_id))) + goto prop_msg_full; nla_nest_end(msg->skb, prop); err = __tipc_nl_add_stats(msg->skb, &link->stats); @@ -2183,6 +2186,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) struct nlattr *prop; struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_link *bcl = tn->bcl; + bool bcast = tipc_bcast_get_bcast(net); if (!bcl) return 0; @@ -2218,6 +2222,8 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) goto attr_msg_full; if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window)) goto prop_msg_full; + if (nla_put_u8(msg->skb, TIPC_NLA_PROP_BCAST, bcast)) + goto prop_msg_full; nla_nest_end(msg->skb, prop); err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats); -- 2.17.1 |
From: David M. <da...@da...> - 2019-01-25 06:32:02
|
From: Zhaolong Zhang <zha...@12...> Date: Thu, 24 Jan 2019 10:06:41 +0800 > max_rcvbuf_size is no longer used since commit "414574a0af36". > > Signed-off-by: Zhaolong Zhang <zha...@12...> Applied. |
From: Tuong L. <tuo...@de...> - 2019-01-24 02:51:12
|
When a link endpoint is re-created (e.g. after a node reboot or interface reset), the link session number is varied by random, the peer endpoint will be synced with this new session number before the link is re-established. However, there is a shortcoming in this mechanism that can lead to the link never re-established or faced with a failure then. It happens when the peer endpoint is ready in ESTABLISHING state, the 'peer_session' as well as the 'in_session' flag have been set, but suddenly this link endpoint leaves. When it comes back with a random session number, there are two situations possible: 1/ If the random session number is larger than (or equal to) the previous one, the peer endpoint will be updated with this new session upon receipt of a RESET_MSG from this endpoint, and the link can be re- established as normal. Otherwise, all the RESET_MSGs from this endpoint will be rejected by the peer. In turn, when this link endpoint receives one ACTIVATE_MSG from the peer, it will move to ESTABLISHED and start to send STATE_MSGs, but again these messages will be dropped by the peer due to wrong session. The peer link endpoint can still become ESTABLISHED after receiving a traffic message from this endpoint (e.g. a BCAST_PROTOCOL or NAME_DISTRIBUTOR), but since all the STATE_MSGs are invalid, the link will be forced down sooner or later! Even in case the random session number is larger than the previous one, it can be that the ACTIVATE_MSG from the peer arrives first, and this link endpoint moves quickly to ESTABLISHED without sending out any RESET_MSG yet. Consequently, the peer link will not be updated with the new session number, and the same link failure scenario as above will happen. 2/ Another situation can be that, the peer link endpoint was reset due to any reasons in the meantime, its link state was set to RESET from ESTABLISHING but still in session, i.e. the 'in_session' flag is not reset... Now, if the random session number from this endpoint is less than the previous one, all the RESET_MSGs from this endpoint will be rejected by the peer. In the other direction, when this link endpoint receives a RESET_MSG from the peer, it moves to ESTABLISHING and starts to send ACTIVATE_MSGs, but all these messages will be rejected by the peer too. As a result, the link cannot be re-established but gets stuck with this link endpoint in state ESTABLISHING and the peer in RESET! Solution: =========== This link endpoint should not go directly to ESTABLISHED when getting ACTIVATE_MSG from the peer which may belong to the old session if the link was re-created. To ensure the session to be correct before the link is re-established, the peer endpoint in ESTABLISHING state will send back the last session number in ACTIVATE_MSG for a verification at this endpoint. Then, if needed, a new and more appropriate session number will be regenerated to force a re-synch first. In addition, when a link in ESTABLISHING state is reset, its state will move to RESET according to the link FSM, along with resetting the 'in_session' flag (and the other data) as a normal link reset, it will also be deleted if requested. The solution is backward compatible. Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 15 +++++++++++++++ net/tipc/msg.h | 22 ++++++++++++++++++++++ net/tipc/node.c | 11 ++++++----- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 2792a3cae682..ce3363d8f1c8 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1425,6 +1425,10 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, l->rcv_unacked = 0; } else { /* RESET_MSG or ACTIVATE_MSG */ + if (mtyp == ACTIVATE_MSG) { + msg_set_dest_session_valid(hdr, 1); + msg_set_dest_session(hdr, l->peer_session); + } msg_set_max_pkt(hdr, l->advertised_mtu); strcpy(data, l->if_name); msg_set_size(hdr, INT_H_SIZE + TIPC_MAX_IF_NAME); @@ -1642,6 +1646,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); break; } + + /* If this endpoint was re-created while peer was ESTABLISHING + * it doesn't know current session number. Force re-synch. + */ + if (mtyp == ACTIVATE_MSG && msg_dest_session_valid(hdr) && + l->session != msg_dest_session(hdr)) { + if (less(l->session, msg_dest_session(hdr))) + l->session = msg_dest_session(hdr) + 1; + break; + } + /* ACTIVATE_MSG serves as PEER_RESET if link is already down */ if (mtyp == RESET_MSG || !link_is_up(l)) rc = tipc_link_fsm_evt(l, LINK_PEER_RESET_EVT); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index a0924956bb61..d7e4b8b93f9d 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -360,6 +360,28 @@ static inline void msg_set_bcast_ack(struct tipc_msg *m, u16 n) msg_set_bits(m, 1, 0, 0xffff, n); } +/* Note: reusing bits in word 1 for ACTIVATE_MSG only, to re-synch + * link peer session number + */ +static inline bool msg_dest_session_valid(struct tipc_msg *m) +{ + return msg_bits(m, 1, 16, 0x1); +} + +static inline void msg_set_dest_session_valid(struct tipc_msg *m, bool valid) +{ + msg_set_bits(m, 1, 16, 0x1, valid); +} + +static inline u16 msg_dest_session(struct tipc_msg *m) +{ + return msg_bits(m, 1, 0, 0xffff); +} + +static inline void msg_set_dest_session(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 1, 0, 0xffff, n); +} /* * Word 2 diff --git a/net/tipc/node.c b/net/tipc/node.c index db2a6c3e0be9..2dc4919ab23c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -830,15 +830,16 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) tipc_node_write_lock(n); if (!tipc_link_is_establishing(l)) { __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); - if (delete) { - kfree(l); - le->link = NULL; - n->link_cnt--; - } } else { /* Defuse pending tipc_node_link_up() */ + tipc_link_reset(l); tipc_link_fsm_evt(l, LINK_RESET_EVT); } + if (delete) { + kfree(l); + le->link = NULL; + n->link_cnt--; + } trace_tipc_node_link_down(n, true, "node link down or deleted!"); tipc_node_write_unlock(n); if (delete) -- 2.13.7 |