You can subscribe to this list here.
2003 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(6) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2004 |
Jan
(9) |
Feb
(11) |
Mar
(22) |
Apr
(73) |
May
(78) |
Jun
(146) |
Jul
(80) |
Aug
(27) |
Sep
(5) |
Oct
(14) |
Nov
(18) |
Dec
(27) |
2005 |
Jan
(20) |
Feb
(30) |
Mar
(19) |
Apr
(28) |
May
(50) |
Jun
(31) |
Jul
(32) |
Aug
(14) |
Sep
(36) |
Oct
(43) |
Nov
(74) |
Dec
(63) |
2006 |
Jan
(34) |
Feb
(32) |
Mar
(21) |
Apr
(76) |
May
(106) |
Jun
(72) |
Jul
(70) |
Aug
(175) |
Sep
(130) |
Oct
(39) |
Nov
(81) |
Dec
(43) |
2007 |
Jan
(81) |
Feb
(36) |
Mar
(20) |
Apr
(43) |
May
(54) |
Jun
(34) |
Jul
(44) |
Aug
(55) |
Sep
(44) |
Oct
(54) |
Nov
(43) |
Dec
(41) |
2008 |
Jan
(42) |
Feb
(84) |
Mar
(73) |
Apr
(30) |
May
(119) |
Jun
(54) |
Jul
(54) |
Aug
(93) |
Sep
(173) |
Oct
(130) |
Nov
(145) |
Dec
(153) |
2009 |
Jan
(59) |
Feb
(12) |
Mar
(28) |
Apr
(18) |
May
(56) |
Jun
(9) |
Jul
(28) |
Aug
(62) |
Sep
(16) |
Oct
(19) |
Nov
(15) |
Dec
(17) |
2010 |
Jan
(14) |
Feb
(36) |
Mar
(37) |
Apr
(30) |
May
(33) |
Jun
(53) |
Jul
(42) |
Aug
(50) |
Sep
(67) |
Oct
(66) |
Nov
(69) |
Dec
(36) |
2011 |
Jan
(52) |
Feb
(45) |
Mar
(49) |
Apr
(21) |
May
(34) |
Jun
(13) |
Jul
(19) |
Aug
(37) |
Sep
(43) |
Oct
(10) |
Nov
(23) |
Dec
(30) |
2012 |
Jan
(42) |
Feb
(36) |
Mar
(46) |
Apr
(25) |
May
(96) |
Jun
(146) |
Jul
(40) |
Aug
(28) |
Sep
(61) |
Oct
(45) |
Nov
(100) |
Dec
(53) |
2013 |
Jan
(79) |
Feb
(24) |
Mar
(134) |
Apr
(156) |
May
(118) |
Jun
(75) |
Jul
(278) |
Aug
(145) |
Sep
(136) |
Oct
(168) |
Nov
(137) |
Dec
(439) |
2014 |
Jan
(284) |
Feb
(158) |
Mar
(231) |
Apr
(275) |
May
(259) |
Jun
(91) |
Jul
(222) |
Aug
(215) |
Sep
(165) |
Oct
(166) |
Nov
(211) |
Dec
(150) |
2015 |
Jan
(164) |
Feb
(324) |
Mar
(299) |
Apr
(214) |
May
(111) |
Jun
(109) |
Jul
(105) |
Aug
(36) |
Sep
(58) |
Oct
(131) |
Nov
(68) |
Dec
(30) |
2016 |
Jan
(46) |
Feb
(87) |
Mar
(135) |
Apr
(174) |
May
(132) |
Jun
(135) |
Jul
(149) |
Aug
(125) |
Sep
(79) |
Oct
(49) |
Nov
(95) |
Dec
(102) |
2017 |
Jan
(104) |
Feb
(75) |
Mar
(72) |
Apr
(53) |
May
(18) |
Jun
(5) |
Jul
(14) |
Aug
(19) |
Sep
(2) |
Oct
(13) |
Nov
(21) |
Dec
(67) |
2018 |
Jan
(56) |
Feb
(50) |
Mar
(148) |
Apr
(41) |
May
(37) |
Jun
(34) |
Jul
(34) |
Aug
(11) |
Sep
(52) |
Oct
(48) |
Nov
(28) |
Dec
(46) |
2019 |
Jan
(29) |
Feb
(63) |
Mar
(95) |
Apr
(54) |
May
(14) |
Jun
(71) |
Jul
(60) |
Aug
(49) |
Sep
(3) |
Oct
(64) |
Nov
(115) |
Dec
(57) |
2020 |
Jan
(15) |
Feb
(9) |
Mar
(38) |
Apr
(27) |
May
(60) |
Jun
(53) |
Jul
(35) |
Aug
(46) |
Sep
(37) |
Oct
(64) |
Nov
(20) |
Dec
(25) |
2021 |
Jan
(20) |
Feb
(31) |
Mar
(27) |
Apr
(23) |
May
(21) |
Jun
(30) |
Jul
(30) |
Aug
(7) |
Sep
(18) |
Oct
|
Nov
(15) |
Dec
(4) |
2022 |
Jan
(3) |
Feb
(1) |
Mar
(10) |
Apr
|
May
(2) |
Jun
(26) |
Jul
(5) |
Aug
|
Sep
(1) |
Oct
(2) |
Nov
(9) |
Dec
(2) |
2023 |
Jan
(4) |
Feb
(4) |
Mar
(5) |
Apr
(10) |
May
(29) |
Jun
(17) |
Jul
|
Aug
|
Sep
(1) |
Oct
(1) |
Nov
(2) |
Dec
|
2024 |
Jan
|
Feb
(6) |
Mar
|
Apr
(1) |
May
(6) |
Jun
|
Jul
(5) |
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2025 |
Jan
|
Feb
(3) |
Mar
|
Apr
|
May
|
Jun
|
Jul
(6) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: Tuong L. <tuo...@de...> - 2019-07-12 05:16:04
|
In conjunction with changing the interfaces' MTU (e.g. especially in the case of a bonding) where the TIPC links are brought up and down in a short time, a couple of issues were detected with the current link changeover mechanism: 1) When one link is up but immediately forced down again, the failover procedure will be carried out in order to failover all the messages in the link's transmq queue onto the other working link. The link and node state is also set to FAILINGOVER as part of the process. The message will be transmited in form of a FAILOVER_MSG, so its size is plus of 40 bytes (= the message header size). There is no problem if the original message size is not larger than the link's MTU - 40, and indeed this is the max size of a normal payload messages. However, in the situation above, because the link has just been up, the messages in the link's transmq are almost SYNCH_MSGs which had been generated by the link synching procedure, then their size might reach the max value already! When the FAILOVER_MSG is built on the top of such a SYNCH_MSG, its size will exceed the link's MTU. As a result, the messages are dropped silently and the failover procedure will never end up, the link will not be able to exit the FAILINGOVER state, so cannot be re-established. 2) The same scenario above can happen more easily in case the MTU of the links is set differently or when changing. In that case, as long as a large message in the failure link's transmq queue was built and fragmented with its link's MTU > the other link's one, the issue will happen (there is no need of a link synching in advance). 3) The link synching procedure also faces with the same issue but since the link synching is only started upon receipt of a SYNCH_MSG, dropping the message will not result in a state deadlock, but it is not expected as design. The 1) & 3) issues are resolved by the last commit that only a dummy SYNCH_MSG (i.e. without data) is generated at the link synching, so the size of a FAILOVER_MSG if any then will never exceed the link's MTU. For the 2) issue, the only solution is trying to fragment the messages in the failure link's transmq queue according to the working link's MTU so they can be failovered then. A new function is made to accomplish this, it will still be a TUNNEL PROTOCOL/FAILOVER MSG but if the original message size is too large, it will be fragmented & reassembled at the receiving side. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++++--------- net/tipc/msg.c | 59 ++++++++++++++++++++++++++++++++++++ net/tipc/msg.h | 18 ++++++++++- 3 files changed, 155 insertions(+), 15 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index e215b4ba6a4b..2c274777b2dd 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -180,6 +180,7 @@ struct tipc_link { /* Fragmentation/reassembly */ struct sk_buff *reasm_buf; + struct sk_buff *reasm_tnlmsg; /* Broadcast */ u16 ackers; @@ -897,8 +898,10 @@ void tipc_link_reset(struct tipc_link *l) l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); + kfree_skb(l->reasm_tnlmsg); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; + l->reasm_tnlmsg = NULL; l->failover_reasm_skb = NULL; l->rcv_unacked = 0; l->snd_nxt = 1; @@ -940,6 +943,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, int rc = 0; if (unlikely(msg_size(hdr) > mtu)) { + pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n", + skb_queue_len(list), msg_user(hdr), + msg_type(hdr), msg_size(hdr), mtu); skb_queue_purge(list); return -EMSGSIZE; } @@ -1233,6 +1239,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { struct sk_buff **reasm_skb = &l->failover_reasm_skb; + struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg; struct sk_buff_head *fdefq = &l->failover_deferdq; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff *iskb; @@ -1240,40 +1247,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, int rc = 0; u16 seqno; - /* SYNCH_MSG */ - if (msg_type(hdr) == SYNCH_MSG) - goto drop; + if (msg_type(hdr) == SYNCH_MSG) { + kfree_skb(skb); + return 0; + } - /* FAILOVER_MSG */ - if (!tipc_msg_extract(skb, &iskb, &ipos)) { - pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n", - skb_queue_len(fdefq)); - return rc; + /* Not a fragment? */ + if (likely(!msg_nof_fragms(hdr))) { + if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) { + pr_warn_ratelimited("Unable to extract msg, defq: %d\n", + skb_queue_len(fdefq)); + return 0; + } + kfree_skb(skb); + } else { + /* Set fragment type for buf_append */ + if (msg_fragm_no(hdr) == 1) + msg_set_type(hdr, FIRST_FRAGMENT); + else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr)) + msg_set_type(hdr, FRAGMENT); + else + msg_set_type(hdr, LAST_FRAGMENT); + + if (!tipc_buf_append(reasm_tnlmsg, &skb)) { + /* Successful but non-complete reassembly? */ + if (*reasm_tnlmsg || link_is_bc_rcvlink(l)) + return 0; + pr_warn_ratelimited("Unable to reassemble tunnel msg\n"); + return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + } + iskb = skb; } do { seqno = buf_seqno(iskb); - if (unlikely(less(seqno, l->drop_point))) { kfree_skb(iskb); continue; } - if (unlikely(seqno != l->drop_point)) { __tipc_skb_queue_sorted(fdefq, seqno, iskb); continue; } l->drop_point++; - if (!tipc_data_input(l, iskb, inputq)) rc |= tipc_link_input(l, iskb, inputq, reasm_skb); if (unlikely(rc)) break; } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point))); -drop: - kfree_skb(skb); return rc; } @@ -1663,15 +1686,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff *skb, *tnlskb; struct tipc_msg *hdr, tnlhdr; struct sk_buff_head *queue = &l->transmq; - struct sk_buff_head tmpxq, tnlq; + struct sk_buff_head tmpxq, tnlq, frags; u16 pktlen, pktcnt, seqno = l->snd_nxt; + bool pktcnt_need_update = false; u16 syncpt; + int rc; if (!tnl) return; skb_queue_head_init(&tnlq); skb_queue_head_init(&tmpxq); + skb_queue_head_init(&frags); /* At least one packet required for safe algorithm => add dummy */ skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, @@ -1727,6 +1753,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, if (queue == &l->backlogq) msg_set_seqno(hdr, seqno++); pktlen = msg_size(hdr); + + /* Tunnel link MTU is not large enough? This could be + * due to: + * 1) Link MTU has just changed or set differently; + * 2) Or FAILOVER on the top of a SYNCH message + * + * The 2nd case should not happen if peer supports + * TIPC_TUNNEL_ENHANCED + */ + if (pktlen > tnl->mtu - INT_H_SIZE) { + if (mtyp == FAILOVER_MSG && + (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu, + &frags); + if (rc) { + pr_warn("%sunable to frag msg: rc %d\n", + link_co_err, rc); + return; + } + pktcnt += skb_queue_len(&frags) - 1; + pktcnt_need_update = true; + skb_queue_splice_tail_init(&frags, &tnlq); + continue; + } + /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED + * => Just warn it and return! + */ + pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n", + link_co_err, msg_user(hdr), + msg_type(hdr), msg_size(hdr)); + return; + } + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); if (!tnlskb) { @@ -1742,6 +1801,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, goto tnl; } + if (pktcnt_need_update) + skb_queue_walk(&tnlq, skb) { + hdr = buf_msg(skb); + msg_set_msgcnt(hdr, pktcnt); + } + tipc_link_xmit(tnl, &tnlq, xmitq); if (mtyp == FAILOVER_MSG) { diff --git a/net/tipc/msg.c b/net/tipc/msg.c index f48e5857210f..e6d49cdc61b4 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -244,6 +244,65 @@ bool tipc_msg_validate(struct sk_buff **_skb) } /** + * tipc_msg_fragment - build a fragment skb list for TIPC message + * + * @skb: TIPC message skb + * @hdr: internal msg header to be put on the top of the fragments + * @pktmax: max size of a fragment incl. the header + * @frags: returned fragment skb list + * + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL + * or -ENOMEM + */ +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags) +{ + int pktno, nof_fragms, dsz, dmax, eat; + struct tipc_msg *_hdr; + struct sk_buff *_skb; + u8 *data; + + /* Non-linear buffer? */ + if (skb_linearize(skb)) + return -ENOMEM; + + data = (u8 *)skb->data; + dsz = msg_size(buf_msg(skb)); + dmax = pktmax - INT_H_SIZE; + if (dsz <= dmax || !dmax) + return -EINVAL; + + nof_fragms = dsz / dmax + 1; + for (pktno = 1; pktno <= nof_fragms; pktno++) { + if (pktno < nof_fragms) + eat = dmax; + else + eat = dsz % dmax; + /* Allocate a new fragment */ + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); + if (!_skb) + goto error; + skb_orphan(_skb); + __skb_queue_tail(frags, _skb); + /* Copy header & data to the fragment */ + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); + data += eat; + /* Update the fragment's header */ + _hdr = buf_msg(_skb); + msg_set_fragm_no(_hdr, pktno); + msg_set_nof_fragms(_hdr, nof_fragms); + msg_set_size(_hdr, INT_H_SIZE + eat); + } + return 0; + +error: + __skb_queue_purge(frags); + __skb_queue_head_init(frags); + return -ENOMEM; +} + +/** * tipc_msg_build - create buffer chain containing specified header and data * @mhdr: Message header, to be prepended to data * @m: User message diff --git a/net/tipc/msg.h b/net/tipc/msg.h index fca042cdff88..1c8c8dd32a4e 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) msg_set_bits(m, 4, 16, 0xffff, n); } +static inline u32 msg_nof_fragms(struct tipc_msg *m) +{ + return msg_bits(m, 4, 0, 0xffff); +} + +static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 4, 0, 0xffff, n); +} + +static inline u32 msg_fragm_no(struct tipc_msg *m) +{ + return msg_bits(m, 4, 16, 0xffff); +} + static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) { msg_set_bits(m, 4, 16, 0xffff, n); } - static inline u16 msg_next_sent(struct tipc_msg *m) { return msg_bits(m, 4, 0, 0xffff); @@ -1045,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu); bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, u32 mtu, u32 dnode); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-12 05:16:03
|
This patch series is to resolve some issues found with the current link changeover mechanism, it also includes an optimization for the link synching. Tuong Lien (2): tipc: optimize link synching mechanism tipc: fix changeover issues due to large packet net/tipc/link.c | 119 +++++++++++++++++++++++++++++++++++++++++++++++++------- net/tipc/msg.c | 59 ++++++++++++++++++++++++++++ net/tipc/msg.h | 28 ++++++++++++- net/tipc/node.c | 6 ++- net/tipc/node.h | 6 ++- 5 files changed, 199 insertions(+), 19 deletions(-) -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-12 05:16:03
|
This commit along with the next one are to resolve the issues with the link changeover mechanism. See that commit for details. Basically, for the link synching, from now on, we will send only one single ("dummy") SYNCH message to peer. The SYNCH message does not contain any data, just a header conveying the synch point to the peer. A new node capability flag ("TIPC_TUNNEL_ENHANCED") is introduced for backward compatible! Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jon...@er...> Suggested-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 26 ++++++++++++++++++++++++++ net/tipc/msg.h | 10 ++++++++++ net/tipc/node.c | 6 ++++-- net/tipc/node.h | 6 ++++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 66d3a07bc571..e215b4ba6a4b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1665,6 +1665,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff_head *queue = &l->transmq; struct sk_buff_head tmpxq, tnlq; u16 pktlen, pktcnt, seqno = l->snd_nxt; + u16 syncpt; if (!tnl) return; @@ -1684,6 +1685,31 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, tipc_link_xmit(l, &tnlq, &tmpxq); __skb_queue_purge(&tmpxq); + /* Link Synching: + * From now on, send only one single ("dummy") SYNCH message + * to peer. The SYNCH message does not contain any data, just + * a header conveying the synch point to the peer. + */ + if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, + INT_H_SIZE, 0, l->addr, + tipc_own_addr(l->net), + 0, 0, 0); + if (!tnlskb) { + pr_warn("%sunable to create dummy SYNCH_MSG\n", + link_co_err); + return; + } + + hdr = buf_msg(tnlskb); + syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1; + msg_set_syncpt(hdr, syncpt); + msg_set_bearer_id(hdr, l->peer_bearer_id); + __skb_queue_tail(&tnlq, tnlskb); + tipc_link_xmit(tnl, &tnlq, xmitq); + return; + } + /* Initialize reusable tunnel packet header */ tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, mtyp, INT_H_SIZE, l->addr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index da509f0eb9ca..fca042cdff88 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -877,6 +877,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + static inline u32 msg_conn_ack(struct tipc_msg *m) { return msg_bits(m, 9, 16, 0xffff); diff --git a/net/tipc/node.c b/net/tipc/node.c index 324a1f91b394..5d8b48051bb9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1649,7 +1649,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, int usr = msg_user(hdr); int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - u16 iseqno = msg_seqno(msg_inner_hdr(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; int state = n->state; @@ -1748,7 +1747,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate synch mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) { - syncpt = iseqno + exp_pkts - 1; + if (n->capabilities & TIPC_TUNNEL_ENHANCED) + syncpt = msg_syncpt(hdr); + else + syncpt = msg_seqno(msg_inner_hdr(hdr)) + exp_pkts - 1; if (!tipc_link_is_up(l)) __tipc_node_link_up(n, bearer_id, xmitq); if (n->state == SELF_UP_PEER_UP) { diff --git a/net/tipc/node.h b/net/tipc/node.h index c0bf49ea3de4..291d0ecd4101 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -53,7 +53,8 @@ enum { TIPC_NODE_ID128 = (1 << 5), TIPC_LINK_PROTO_SEQNO = (1 << 6), TIPC_MCAST_RBCTL = (1 << 7), - TIPC_GAP_ACK_BLOCK = (1 << 8) + TIPC_GAP_ACK_BLOCK = (1 << 8), + TIPC_TUNNEL_ENHANCED = (1 << 9) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -64,7 +65,8 @@ enum { TIPC_NODE_ID128 | \ TIPC_LINK_PROTO_SEQNO | \ TIPC_MCAST_RBCTL | \ - TIPC_GAP_ACK_BLOCK) + TIPC_GAP_ACK_BLOCK | \ + TIPC_TUNNEL_ENHANCED) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); -- 2.13.7 |
From: Jon M. <jon...@er...> - 2019-07-11 12:55:23
|
> -----Original Message----- > From: Chris Packham <Chr...@al...> > Sent: 10-Jul-19 16:58 > To: Jon Maloy <jon...@er...>; Eric Dumazet > <eri...@gm...>; yin...@wi...; > da...@da... > Cc: ne...@vg...; tip...@li...; linux- > ke...@vg... > Subject: Re: [PATCH] tipc: ensure skb->lock is initialised > > > On 11/07/19 1:10 AM, Jon Maloy wrote: > >> -----Original Message----- > >> From: Eric Dumazet <eri...@gm...> > >> Sent: 10-Jul-19 04:00 > >> To: Jon Maloy <jon...@er...>; Eric Dumazet > >> <eri...@gm...>; Chris Packham > >> <Chr...@al...>; yin...@wi...; > >> da...@da... > >> Cc: ne...@vg...; tip...@li...; > >> linux- ke...@vg... > >> Subject: Re: [PATCH] tipc: ensure skb->lock is initialised > >> > >> > >> > >> On 7/9/19 10:15 PM, Jon Maloy wrote: > >>> > >>> It is not only for lockdep purposes, -it is essential. But please > >>> provide details > >> about where you see that more fixes are needed. > >>> > >> > >> Simple fact that you detect a problem only when skb_queue_purge() is > >> called should talk by itself. > >> > >> As I stated, there are many places where the list is manipulated > >> _without_ its spinlock being held. > > > > Yes, and that is the way it should be on the send path. > > > >> > >> You want consistency, then > >> > >> - grab the spinlock all the time. > >> - Or do not ever use it. > > > > That is exactly what we are doing. > > - The send path doesn't need the spinlock, and never grabs it. > > - The receive path does need it, and always grabs it. > > > > However, since we don't know from the beginning which path a created > > message will follow, we initialize the queue spinlock "just in case" > > when it is created, even though it may never be used later. > > You can see this as a violation of the principle you are stating > > above, but it is a prize that is worth paying, given savings in code > > volume, complexity and performance. > > > >> > >> Do not initialize the spinlock just in case a path will use > >> skb_queue_purge() (instead of using __skb_queue_purge()) > > > > I am ok with that. I think we can agree that Chris goes for that > > solution, so we can get this bug fixed. > > So would you like a v2 with an improved commit message? I note that I said > skb->lock instead of head->lock in the subject line so at least that should be > corrected. Yes, unless Eric has any more objections. I addition, I have realized that we can change from skb_queue_head_init() to __skb_queue_head_init() at all the disputed locations in the socket code. Then, we do a separate call to spin_lock_init(&list->lock) at the moment we find that the message will follow the receive path, i.e., in tipc_node_xmit(). That should make everybody happy. I will post a patch when net-next re-opens. BR ///jon |
From: Jon M. <jon...@er...> - 2019-07-10 13:42:35
|
> -----Original Message----- > From: Eric Dumazet <eri...@gm...> > Sent: 10-Jul-19 04:00 > To: Jon Maloy <jon...@er...>; Eric Dumazet > <eri...@gm...>; Chris Packham > <Chr...@al...>; yin...@wi...; > da...@da... > Cc: ne...@vg...; tip...@li...; linux- > ke...@vg... > Subject: Re: [PATCH] tipc: ensure skb->lock is initialised > > > > On 7/9/19 10:15 PM, Jon Maloy wrote: > > > > It is not only for lockdep purposes, -it is essential. But please provide details > about where you see that more fixes are needed. > > > > Simple fact that you detect a problem only when skb_queue_purge() is called > should talk by itself. > > As I stated, there are many places where the list is manipulated _without_ its > spinlock being held. Yes, and that is the way it should be on the send path. > > You want consistency, then > > - grab the spinlock all the time. > - Or do not ever use it. That is exactly what we are doing. - The send path doesn't need the spinlock, and never grabs it. - The receive path does need it, and always grabs it. However, since we don't know from the beginning which path a created message will follow, we initialize the queue spinlock "just in case" when it is created, even though it may never be used later. You can see this as a violation of the principle you are stating above, but it is a prize that is worth paying, given savings in code volume, complexity and performance. > > Do not initialize the spinlock just in case a path will use skb_queue_purge() > (instead of using __skb_queue_purge()) I am ok with that. I think we can agree that Chris goes for that solution, so we can get this bug fixed. ///jon |
From: Tung N. <tun...@de...> - 2019-07-10 02:39:45
|
Commit 365ad35 ("tipc: reduce risk of user starvation during link congestion") aimed to allow senders to add their lists of socket buffers to the link backlog queues under link congestion. However, when dequeuing the link wakeup queue by link_prepare_wakeup(), there might be a worst case scenario: - More than 10 wakeup messages near the wakeup queue head are not dequeued because the backlog queue length is still greater than the limit. - Many wakeup messages near the wakeup queue tail are not dequeued though their importance is different from the one of 10 wakeup messages near the queue head and the backlog queue length is less than the limit. Furthermore, function tipc_sk_rcv() consumes both normal data messages and wakeup messages from the link input queue. By allowing oversubscriptions, the number of messages passed through tipc_sk_rcv() would be higher. Applying destination port filter to wakeup messages via tipc_skb_peek_port() is not necessary and even causes more overhead. As a result, some non-blocking socket senders are not able to send data because epoll() takes many seconds to return EPOLLOUT. This commit fixes this issues by: - Allowing to dequeue as many wakeup messages as possible. - Separating wakeup messages from the link input queue. All wakeup messages are now put in a local queue and consumed in a simple way. Fixes: 365ad35 ("tipc: reduce risk of user starvation during link congestion") Signed-off-by: Tung Nguyen <tun...@de...> --- net/tipc/bcast.c | 42 +++++++++++----------- net/tipc/link.c | 67 +++++++++++++++++----------------- net/tipc/link.h | 12 ++++--- net/tipc/node.c | 28 +++++++++++---- net/tipc/socket.c | 91 +++++++++++++++++++++++++++++++++++++++++------ net/tipc/socket.h | 1 + 6 files changed, 166 insertions(+), 75 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 6c997d4a6218..4de0f9780ef5 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -421,11 +421,11 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) { struct tipc_msg *hdr = buf_msg(skb); - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; - struct sk_buff_head xmitq; + struct sk_buff_head xmitq, wakeupq; int rc; __skb_queue_head_init(&xmitq); + __skb_queue_head_init(&wakeupq); if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) { kfree_skb(skb); @@ -434,16 +434,16 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) tipc_bcast_lock(net); if (msg_user(hdr) == BCAST_PROTOCOL) - rc = tipc_link_bc_nack_rcv(l, skb, &xmitq); + rc = tipc_link_bc_nack_rcv(l, skb, &xmitq, &wakeupq); else - rc = tipc_link_rcv(l, skb, NULL); + rc = tipc_link_rcv(l, skb, NULL, &wakeupq); tipc_bcast_unlock(net); tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ - if (!skb_queue_empty(inputq)) - tipc_sk_rcv(net, inputq); + if (!skb_queue_empty(&wakeupq)) + tipc_sk_rcv_wakeup(net, &wakeupq); return rc; } @@ -455,25 +455,25 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb) void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr) { - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; u16 acked = msg_bcast_ack(hdr); - struct sk_buff_head xmitq; + struct sk_buff_head xmitq, wakeupq; /* Ignore bc acks sent by peer before bcast synch point was received */ if (msg_bc_ack_invalid(hdr)) return; __skb_queue_head_init(&xmitq); + __skb_queue_head_init(&wakeupq); tipc_bcast_lock(net); - tipc_link_bc_ack_rcv(l, acked, &xmitq); + tipc_link_bc_ack_rcv(l, acked, &xmitq, &wakeupq); tipc_bcast_unlock(net); tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ - if (!skb_queue_empty(inputq)) - tipc_sk_rcv(net, inputq); + if (!skb_queue_empty(&wakeupq)) + tipc_sk_rcv_wakeup(net, &wakeupq); } /* tipc_bcast_synch_rcv - check and update rcv link with peer's send state @@ -483,17 +483,17 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr) { - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; - struct sk_buff_head xmitq; + struct sk_buff_head xmitq, wakeupq; int rc = 0; __skb_queue_head_init(&xmitq); + __skb_queue_head_init(&wakeupq); tipc_bcast_lock(net); if (msg_type(hdr) != STATE_MSG) { tipc_link_bc_init_rcv(l, hdr); } else if (!msg_bc_ack_invalid(hdr)) { - tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); + tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq, &wakeupq); rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq); } tipc_bcast_unlock(net); @@ -501,8 +501,8 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ - if (!skb_queue_empty(inputq)) - tipc_sk_rcv(net, inputq); + if (!skb_queue_empty(&wakeupq)) + tipc_sk_rcv_wakeup(net, &wakeupq); return rc; } @@ -529,13 +529,13 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l, void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) { struct tipc_link *snd_l = tipc_bc_sndlink(net); - struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; - struct sk_buff_head xmitq; + struct sk_buff_head xmitq, wakeupq; __skb_queue_head_init(&xmitq); + __skb_queue_head_init(&wakeupq); tipc_bcast_lock(net); - tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq); + tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq, &wakeupq); tipc_bcbase_select_primary(net); tipc_bcbase_calc_bc_threshold(net); tipc_bcast_unlock(net); @@ -543,8 +543,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) tipc_bcbase_xmit(net, &xmitq); /* Any socket wakeup messages ? */ - if (!skb_queue_empty(inputq)) - tipc_sk_rcv(net, inputq); + if (!skb_queue_empty(&wakeupq)) + tipc_sk_rcv_wakeup(net, &wakeupq); } int tipc_bclink_reset_stats(struct net *net) diff --git a/net/tipc/link.c b/net/tipc/link.c index 2050fd386642..e67d7e6a793b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -237,7 +237,8 @@ static int link_is_up(struct tipc_link *l) } static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq); static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, bool probe_reply, u16 rcvgap, int tolerance, int priority, @@ -331,6 +332,11 @@ struct sk_buff_head *tipc_link_inputq(struct tipc_link *l) return l->inputq; } +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l) +{ + return &l->wakeupq; +} + char tipc_link_plane(struct tipc_link *l) { return l->net_plane; @@ -355,19 +361,21 @@ void tipc_link_add_bc_peer(struct tipc_link *snd_l, void tipc_link_remove_bc_peer(struct tipc_link *snd_l, struct tipc_link *rcv_l, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq) { u16 ack = snd_l->snd_nxt - 1; snd_l->ackers--; rcv_l->bc_peer_is_up = true; rcv_l->state = LINK_ESTABLISHED; - tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); + tipc_link_bc_ack_rcv(rcv_l, ack, xmitq, wakeupq); trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); tipc_link_reset(rcv_l); rcv_l->state = LINK_RESET; if (!snd_l->ackers) { trace_tipc_link_reset(snd_l, TIPC_DUMP_ALL, "zero ackers!"); + skb_queue_splice_tail_init(&snd_l->wakeupq, wakeupq); tipc_link_reset(snd_l); snd_l->state = LINK_RESET; __skb_queue_purge(xmitq); @@ -500,7 +508,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, __skb_queue_head_init(&l->backlogq); __skb_queue_head_init(&l->deferdq); __skb_queue_head_init(&l->failover_deferdq); - skb_queue_head_init(&l->wakeupq); + __skb_queue_head_init(&l->wakeupq); skb_queue_head_init(l->inputq); return true; } @@ -839,9 +847,8 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr) dnode, l->addr, dport, 0, 0); if (!skb) return -ENOBUFS; - msg_set_dest_droppable(buf_msg(skb), true); TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr); - skb_queue_tail(&l->wakeupq, skb); + __skb_queue_tail(&l->wakeupq, skb); l->stats.link_congs++; trace_tipc_link_conges(l, TIPC_DUMP_ALL, "wakeup scheduled!"); return -ELINKCONG; @@ -853,46 +860,34 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr) * Wake up a number of waiting users, as permitted by available space * in the send queue */ -static void link_prepare_wakeup(struct tipc_link *l) +static void link_prepare_wakeup(struct tipc_link *l, + struct sk_buff_head *wakeupq) { struct sk_buff *skb, *tmp; - int imp, i = 0; + int imp; skb_queue_walk_safe(&l->wakeupq, skb, tmp) { imp = TIPC_SKB_CB(skb)->chain_imp; if (l->backlog[imp].len < l->backlog[imp].limit) { - skb_unlink(skb, &l->wakeupq); - skb_queue_tail(l->inputq, skb); - } else if (i++ > 10) { - break; + __skb_unlink(skb, &l->wakeupq); + __skb_queue_tail(wakeupq, skb); } } } void tipc_link_reset(struct tipc_link *l) { - struct sk_buff_head list; - - __skb_queue_head_init(&list); - l->in_session = false; /* Force re-synch of peer session number before establishing */ l->peer_session--; l->session++; l->mtu = l->advertised_mtu; - spin_lock_bh(&l->wakeupq.lock); - skb_queue_splice_init(&l->wakeupq, &list); - spin_unlock_bh(&l->wakeupq.lock); - - spin_lock_bh(&l->inputq->lock); - skb_queue_splice_init(&list, l->inputq); - spin_unlock_bh(&l->inputq->lock); - __skb_queue_purge(&l->transmq); __skb_queue_purge(&l->deferdq); __skb_queue_purge(&l->backlogq); __skb_queue_purge(&l->failover_deferdq); + __skb_queue_purge(&l->wakeupq); l->backlog[TIPC_LOW_IMPORTANCE].len = 0; l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0; l->backlog[TIPC_HIGH_IMPORTANCE].len = 0; @@ -1445,9 +1440,11 @@ static int tipc_link_build_nack_msg(struct tipc_link *l, * @l: the link that should handle the message * @skb: TIPC packet * @xmitq: queue to place packets to be sent after this call + * @wakeupq: list of wakeup messages to be consumed after this call */ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq) { struct sk_buff_head *defq = &l->deferdq; struct tipc_msg *hdr = buf_msg(skb); @@ -1456,7 +1453,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Verify and update link state */ if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) - return tipc_link_proto_rcv(l, skb, xmitq); + return tipc_link_proto_rcv(l, skb, xmitq, wakeupq); /* Don't send probe at next timeout expiration */ l->silent_intv_cnt = 0; @@ -1484,7 +1481,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, l->stale_cnt = 0; tipc_link_advance_backlog(l, xmitq); if (unlikely(!skb_queue_empty(&l->wakeupq))) - link_prepare_wakeup(l); + link_prepare_wakeup(l, wakeupq); } /* Defer delivery if sequence gap */ @@ -1518,6 +1515,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, bool probe_reply, u16 rcvgap, int tolerance, int priority, struct sk_buff_head *xmitq) + { struct tipc_link *bcl = l->bc_rcvlink; struct sk_buff *skb; @@ -1786,7 +1784,8 @@ bool tipc_link_validate_msg(struct tipc_link *l, struct tipc_msg *hdr) * network plane */ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq) { struct tipc_msg *hdr = buf_msg(skb); struct tipc_gap_ack_blks *ga = NULL; @@ -1926,7 +1925,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, tipc_link_advance_backlog(l, xmitq); if (unlikely(!skb_queue_empty(&l->wakeupq))) - link_prepare_wakeup(l); + link_prepare_wakeup(l, wakeupq); } exit: kfree_skb(skb); @@ -2072,7 +2071,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, } void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq) { struct sk_buff *skb, *tmp; struct tipc_link *snd_l = l->bc_sndlink; @@ -2102,7 +2102,7 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, l->acked = acked; tipc_link_advance_backlog(snd_l, xmitq); if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) - link_prepare_wakeup(snd_l); + link_prepare_wakeup(snd_l, wakeupq); } /* tipc_link_bc_nack_rcv(): receive broadcast nack message @@ -2110,7 +2110,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5. */ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq) { struct tipc_msg *hdr = buf_msg(skb); u32 dnode = msg_destnode(hdr); @@ -2130,7 +2131,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, return 0; if (dnode == tipc_own_addr(l->net)) { - tipc_link_bc_ack_rcv(l, acked, xmitq); + tipc_link_bc_ack_rcv(l, acked, xmitq, wakeupq); rc = tipc_link_retrans(l->bc_sndlink, l, from, to, xmitq); l->stats.recv_nacks++; return rc; diff --git a/net/tipc/link.h b/net/tipc/link.h index adcad65e761c..9568bd965945 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -107,6 +107,7 @@ void tipc_link_reset_stats(struct tipc_link *l); int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, struct sk_buff_head *xmitq); struct sk_buff_head *tipc_link_inputq(struct tipc_link *l); +struct sk_buff_head *tipc_link_wakeupq(struct tipc_link *l); u16 tipc_link_rcv_nxt(struct tipc_link *l); u16 tipc_link_acked(struct tipc_link *l); u32 tipc_link_id(struct tipc_link *l); @@ -130,25 +131,28 @@ int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg, int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, struct sk_buff_head *wakeupq); int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq); void tipc_link_add_bc_peer(struct tipc_link *snd_l, struct tipc_link *uc_l, struct sk_buff_head *xmitq); void tipc_link_remove_bc_peer(struct tipc_link *snd_l, struct tipc_link *rcv_l, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq); int tipc_link_bc_peers(struct tipc_link *l); void tipc_link_set_mtu(struct tipc_link *l, int mtu); int tipc_link_mtu(struct tipc_link *l); void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq); void tipc_link_build_bc_sync_msg(struct tipc_link *l, struct sk_buff_head *xmitq); void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, struct sk_buff_head *xmitq); int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq); bool tipc_link_too_silent(struct tipc_link *l); #endif diff --git a/net/tipc/node.c b/net/tipc/node.c index 550581d47d51..d0ec29081b11 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -154,6 +154,7 @@ enum { static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq, struct tipc_media_addr **maddr); static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete); @@ -803,6 +804,7 @@ static void tipc_node_link_failover(struct tipc_node *n, struct tipc_link *l, */ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, struct sk_buff_head *xmitq, + struct sk_buff_head *wakeupq, struct tipc_media_addr **maddr) { struct tipc_link_entry *le = &n->links[*bearer_id]; @@ -851,6 +853,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT); trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down!"); tipc_link_fsm_evt(l, LINK_RESET_EVT); + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); tipc_link_reset(l); tipc_link_build_reset_msg(l, xmitq); *maddr = &n->links[*bearer_id].maddr; @@ -868,6 +871,7 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id, n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1); tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq); trace_tipc_link_reset(l, TIPC_DUMP_ALL, "link down -> failover!"); + skb_queue_splice_tail_init(tipc_link_wakeupq(l), wakeupq); tipc_link_reset(l); tipc_link_fsm_evt(l, LINK_RESET_EVT); tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); @@ -881,18 +885,20 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) struct tipc_media_addr *maddr = NULL; struct tipc_link *l = le->link; int old_bearer_id = bearer_id; - struct sk_buff_head xmitq; + struct sk_buff_head xmitq, wakeupq; if (!l) return; __skb_queue_head_init(&xmitq); + __skb_queue_head_init(&wakeupq); tipc_node_write_lock(n); if (!tipc_link_is_establishing(l)) { - __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); + __tipc_node_link_down(n, &bearer_id, &xmitq, &wakeupq, &maddr); } else { /* Defuse pending tipc_node_link_up() */ + skb_queue_splice_tail_init(tipc_link_wakeupq(l), &wakeupq); tipc_link_reset(l); tipc_link_fsm_evt(l, LINK_RESET_EVT); } @@ -908,6 +914,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete) if (!skb_queue_empty(&xmitq)) tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); tipc_sk_rcv(n->net, &le->inputq); + tipc_sk_rcv_wakeup(n->net, &wakeupq); } static bool node_is_up(struct tipc_node *n) @@ -1652,6 +1659,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; + struct sk_buff_head wakeupq; int state = n->state; struct tipc_link *l, *tnl, *pl = NULL; struct tipc_media_addr *maddr; @@ -1711,9 +1719,12 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) { syncpt = oseqno + exp_pkts - 1; if (pl && !tipc_link_is_reset(pl)) { - __tipc_node_link_down(n, &pb_id, xmitq, &maddr); + __skb_queue_head_init(&wakeupq); + __tipc_node_link_down(n, &pb_id, xmitq, + &wakeupq, &maddr); trace_tipc_node_link_down(n, true, "node link down <- failover!"); + skb_queue_splice_tail(&wakeupq, tipc_link_wakeupq(l)); tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl), tipc_link_inputq(l)); } @@ -1795,7 +1806,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, */ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) { - struct sk_buff_head xmitq; + struct sk_buff_head xmitq, wakeupq; struct tipc_node *n; struct tipc_msg *hdr; int bearer_id = b->identity; @@ -1805,6 +1816,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) u16 bc_ack; __skb_queue_head_init(&xmitq); + __skb_queue_head_init(&wakeupq); /* Ensure message is well-formed before touching the header */ if (unlikely(!tipc_msg_validate(&skb))) @@ -1842,7 +1854,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) { spin_lock_bh(&le->lock); if (le->link) { - rc = tipc_link_rcv(le->link, skb, &xmitq); + rc = tipc_link_rcv(le->link, skb, &xmitq, &wakeupq); skb = NULL; } spin_unlock_bh(&le->lock); @@ -1856,7 +1868,8 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) tipc_node_write_lock(n); if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) { if (le->link) { - rc = tipc_link_rcv(le->link, skb, &xmitq); + rc = tipc_link_rcv(le->link, skb, + &xmitq, &wakeupq); skb = NULL; } } @@ -1878,6 +1891,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) if (!skb_queue_empty(&le->inputq)) tipc_sk_rcv(net, &le->inputq); + if (!skb_queue_empty(&wakeupq)) + tipc_sk_rcv_wakeup(net, &wakeupq); + if (!skb_queue_empty(&xmitq)) tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index dd8537f988c4..fa48fb14add7 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -2224,6 +2224,33 @@ static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) return 0; } +/** + * tipc_sk_add_backlog - add the skb to the socket backlog queue + * @sk: socket where the skb should be enqueued + * @skb: the skb being enqueued + * + * Return true if the skb was added. Otherwise, return false + */ +static bool tipc_sk_add_backlog(struct sock *sk, struct sk_buff *skb) +{ + unsigned int lim; + atomic_t *dcnt; + + /* Try backlog, compensating for double-counted bytes */ + dcnt = &tipc_sk(sk)->dupl_rcvcnt; + if (!sk->sk_backlog.len) + atomic_set(dcnt, 0); + + lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); + if (likely(!sk_add_backlog(sk, skb, lim))) { + trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, + "bklg & rcvq >90% allocated!"); + return true; + } + + return false; +} + /** * tipc_sk_enqueue - extract all buffers with destination 'dport' from * inputq and try adding them to socket or backlog queue @@ -2238,8 +2265,6 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, { unsigned long time_limit = jiffies + 2; struct sk_buff *skb; - unsigned int lim; - atomic_t *dcnt; u32 onode; while (skb_queue_len(inputq)) { @@ -2256,16 +2281,9 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, continue; } - /* Try backlog, compensating for double-counted bytes */ - dcnt = &tipc_sk(sk)->dupl_rcvcnt; - if (!sk->sk_backlog.len) - atomic_set(dcnt, 0); - lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt); - if (likely(!sk_add_backlog(sk, skb, lim))) { - trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL, - "bklg & rcvq >90% allocated!"); + /* Add the skb to the socket backlog queue */ + if (tipc_sk_add_backlog(sk, skb)) continue; - } trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!"); /* Overload => reject message back to sender */ @@ -2332,6 +2350,57 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) } } +/** + * tipc_sk_rcv_wakeup - handle a chain of wakeup messages + * @wakeupq: list of wakeup messages + * Consumes all messages in the list until it is empty + */ +void tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq) +{ + u32 dport = 0; + struct tipc_sock *tsk; + struct sock *sk; + struct sk_buff *skb, *tmp; + +start: + if (!skb_queue_len(wakeupq)) + return; + + skb_queue_walk_safe(wakeupq, skb, tmp) { + dport = msg_destport(buf_msg(skb)); + tsk = tipc_sk_lookup(net, dport); + + if (!unlikely(tsk)) { + __skb_unlink(skb, wakeupq); + kfree_skb(skb); + continue; + } + + sk = &tsk->sk; + if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { + __skb_unlink(skb, wakeupq); + if (!sock_owned_by_user(sk)) { + tipc_dest_del(&tsk->cong_links, + msg_orignode(buf_msg(skb)), 0); + /* coupled with smp_rmb() in + * tipc_wait_for_cond() + */ + smp_wmb(); + tsk->cong_link_cnt--; + sk->sk_write_space(sk); + kfree_skb(skb); + } else if (unlikely(!tipc_sk_add_backlog(sk, skb))) { + kfree_skb(skb); + pr_warn("Drop wakeup message for port %u\n", + tipc_sk(sk)->portid); + } + spin_unlock_bh(&sk->sk_lock.slock); + } + sock_put(sk); + } + goto start; +} + static int tipc_wait_for_connect(struct socket *sock, long *timeo_p) { DEFINE_WAIT_FUNC(wait, woken_wake_function); diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 235b9679acee..44eb9fb68d7e 100644 --- a/net/tipc/socket.h +++ b/net/tipc/socket.h @@ -54,6 +54,7 @@ struct tipc_sock; int tipc_socket_init(void); void tipc_socket_stop(void); void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); +void tipc_sk_rcv_wakeup(struct net *net, struct sk_buff_head *wakeupq); void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, struct sk_buff_head *inputq); void tipc_sk_reinit(struct net *net); -- 2.17.1 |
From: Jon M. <jon...@er...> - 2019-07-09 20:15:56
|
> -----Original Message----- > From: Eric Dumazet <eri...@gm...> > Sent: 9-Jul-19 09:46 > To: Jon Maloy <jon...@er...>; Eric Dumazet > <eri...@gm...>; Chris Packham > <Chr...@al...>; yin...@wi...; > da...@da... > Cc: ne...@vg...; tip...@li...; linux- > ke...@vg... > Subject: Re: [PATCH] tipc: ensure skb->lock is initialised > > > > On 7/9/19 3:25 PM, Jon Maloy wrote: [...] > > TIPC is using the list lock at message reception within the scope of > tipc_sk_rcv()/tipc_skb_peek_port(), so it is fundamental that the lock always > is correctly initialized. > > Where is the lock acquired, why was it only acquired by queue purge and not > normal dequeues ??? It is acquired twice: - First, in tipc_sk_rcv()->tipc_skb_peek_port(), to peek into one or more queue members and read their destination port number. - Second, in tipc_sk_rcv()->tipc_sk_enqueue()->tipc_skb_dequeue() to unlink a list member from the queue. > >> > > [...] > >> > >> tipc_link_xmit() for example never acquires the spinlock, yet uses > >> skb_peek() and __skb_dequeue() > > > > > > You should look at tipc_node_xmit instead. Node local messages are > > sent directly to tipc_sk_rcv(), and never go through tipc_link_xmit() > > tipc_node_xmit() calls tipc_link_xmit() eventually, right ? No. tipc_link_xmit() is called only for messages with a non-local destination. Otherwise, tipc_node_xmit() sends node local messages directly to the destination socket via tipc_sk_rcv(). The argument 'xmitq' becomes 'inputq' in tipc_sk_rcv() and 'list' in tipc_skb_peek_port(), since those functions don't distinguish between local and node external incoming messages. > > Please show me where the head->lock is acquired, and why it needed. The argument 'inputq' to tipc_sk_rcv() may come from two sources: 1) As an aggregated member of each tipc_node::tipc_link_entry. This queue is needed to guarantee sequential delivery of messages from the node/link layer to the socket layer. In this case, there may be buffers for multiple destination sockets in the same queue, and we may have multiple concurrent tipc_sk_rcv() jobs working that queue. So, the lock is needed both for adding (in link.c::tipc_data_input()), peeking and removing buffers. 2) The case you have been looking at, where it is created as 'xmitq' on the stack by a local socket. Here, the lock is not strictly needed, as you have observed. But to reduce code duplication we have chosen to let the code in tipc_sk_rcv() handle both types of queues uniformly, i.e., as if they all contain buffers with potentially multiple destination sockets, worked on by multiple concurrent calls. This requires that the lock is initialized even for this type of queue. We have seen no measurable performance difference between this 'generic' reception algorithm and a tailor-made ditto for local messages only, while the amount of saved code is significant. > > If this is mandatory, then more fixes are needed than just initializing the lock > for lockdep purposes. It is not only for lockdep purposes, -it is essential. But please provide details about where you see that more fixes are needed. BR ///jon |
From: Jon M. <jon...@er...> - 2019-07-09 13:40:07
|
> -----Original Message----- > From: Eric Dumazet <eri...@gm...> > Sent: 9-Jul-19 03:31 > To: Chris Packham <Chr...@al...>; Eric Dumazet > <eri...@gm...>; Jon Maloy <jon...@er...>; > yin...@wi...; da...@da... > Cc: ne...@vg...; tip...@li...; linux- > ke...@vg... > Subject: Re: [PATCH] tipc: ensure skb->lock is initialised > > > > On 7/8/19 11:13 PM, Chris Packham wrote: > > On 9/07/19 8:43 AM, Chris Packham wrote: > >> On 8/07/19 8:18 PM, Eric Dumazet wrote: > >>> > >>> > >>> On 7/8/19 12:53 AM, Chris Packham wrote: > >>>> tipc_named_node_up() creates a skb list. It passes the list to > >>>> tipc_node_xmit() which has some code paths that can call > >>>> skb_queue_purge() which relies on the list->lock being initialised. > >>>> Ensure tipc_named_node_up() uses skb_queue_head_init() so that the > >>>> lock is explicitly initialised. > >>>> > >>>> Signed-off-by: Chris Packham <chr...@al...> > >>> > >>> I would rather change the faulty skb_queue_purge() to > >>> __skb_queue_purge() > >>> > >> > >> Makes sense. I'll look at that for v2. > >> > > > > Actually maybe not. tipc_rcast_xmit(), tipc_node_xmit_skb(), > > tipc_send_group_msg(), __tipc_sendmsg(), __tipc_sendstream(), and > > tipc_sk_timeout() all use skb_queue_head_init(). So my original change > > brings tipc_named_node_up() into line with them. > > > > I think it should be safe for tipc_node_xmit() to use > > __skb_queue_purge() since all the callers seem to have exclusive > > access to the list of skbs. It still seems that the callers should all > > use > > skb_queue_head_init() for consistency. I agree with that. > > > > No, tipc does not use the list lock (it relies on the socket lock) and therefore > should consistently use __skb_queue_head_init() instead of > skb_queue_head_init() TIPC is using the list lock at message reception within the scope of tipc_sk_rcv()/tipc_skb_peek_port(), so it is fundamental that the lock always is correctly initialized. > [...] > > tipc_link_xmit() for example never acquires the spinlock, yet uses skb_peek() > and __skb_dequeue() You should look at tipc_node_xmit instead. Node local messages are sent directly to tipc_sk_rcv(), and never go through tipc_link_xmit() Regards ///jon |
From: David M. <da...@da...> - 2019-07-07 20:19:09
|
From: David Miller <da...@da...> Date: Sat, 06 Jul 2019 15:15:44 -0700 (PDT) > From: Xin Long <luc...@gm...> > Date: Sat, 6 Jul 2019 14:48:48 +0800 > >> Hi, David, I saw this patch in "Changes Requested". > > I just put it back to Under Review, thanks. Applied to net-next, thank you. |
From: David M. <da...@da...> - 2019-07-06 22:15:47
|
From: Xin Long <luc...@gm...> Date: Sat, 6 Jul 2019 14:48:48 +0800 > Hi, David, I saw this patch in "Changes Requested". I just put it back to Under Review, thanks. |
From: Xin L. <luc...@gm...> - 2019-07-06 06:48:47
|
On Wed, Jul 3, 2019 at 4:33 PM Xin Long <luc...@gm...> wrote: > > On Wed, Jul 3, 2019 at 6:08 AM David Miller <da...@da...> wrote: > > > > From: Xin Long <luc...@gm...> > > Date: Tue, 2 Jul 2019 00:54:55 +0800 > > > > > For these places are protected by rcu_read_lock, we change from > > > rcu_dereference_rtnl to rcu_dereference, as there is no need to > > > check if rtnl lock is held. > > > > > > For these places are protected by rtnl_lock, we change from > > > rcu_dereference_rtnl to rtnl_dereference/rcu_dereference_protected, > > > as no extra memory barriers are needed under rtnl_lock() which also > > > protects tn->bearer_list[] and dev->tipc_ptr/b->media_ptr updating. > > > > > > rcu_dereference_rtnl will be only used in the places where it could > > > be under rcu_read_lock or rtnl_lock. > > > > > > Signed-off-by: Xin Long <luc...@gm...> > > > > In the cases where RTNL is held, even if rcu_read_lock() is also taken, > > we should use rtnl_dereference() because that avoids the READ_ONCE(). > Right, that's what I did in this patch. > > But for the places where it's sometimes called under rtnl_lock() only and > sometimes called under rcu_read_lock() only, like tipc_udp_is_known_peer() > and tipc_udp_rcast_add(), I kept rcu_dereference_rtnl(). makes sense? Hi, David, I saw this patch in "Changes Requested". I've checked all places with this patch, no function calling rcu_dereference() and rcu_dereference_rtnl() will be ONLY called under rtnl_lock() protection. So I can't see a problem with it. Do I need to resend? |
From: Xin L. <luc...@gm...> - 2019-07-03 08:33:55
|
On Wed, Jul 3, 2019 at 6:08 AM David Miller <da...@da...> wrote: > > From: Xin Long <luc...@gm...> > Date: Tue, 2 Jul 2019 00:54:55 +0800 > > > For these places are protected by rcu_read_lock, we change from > > rcu_dereference_rtnl to rcu_dereference, as there is no need to > > check if rtnl lock is held. > > > > For these places are protected by rtnl_lock, we change from > > rcu_dereference_rtnl to rtnl_dereference/rcu_dereference_protected, > > as no extra memory barriers are needed under rtnl_lock() which also > > protects tn->bearer_list[] and dev->tipc_ptr/b->media_ptr updating. > > > > rcu_dereference_rtnl will be only used in the places where it could > > be under rcu_read_lock or rtnl_lock. > > > > Signed-off-by: Xin Long <luc...@gm...> > > In the cases where RTNL is held, even if rcu_read_lock() is also taken, > we should use rtnl_dereference() because that avoids the READ_ONCE(). Right, that's what I did in this patch. But for the places where it's sometimes called under rtnl_lock() only and sometimes called under rcu_read_lock() only, like tipc_udp_is_known_peer() and tipc_udp_rcast_add(), I kept rcu_dereference_rtnl(). makes sense? |
From: David M. <da...@da...> - 2019-07-02 22:13:28
|
From: Xin Long <luc...@gm...> Date: Tue, 2 Jul 2019 00:57:19 +0800 > Both tipc_udp_enable and tipc_udp_disable are called under rtnl_lock, > ub->ubsock could never be NULL in tipc_udp_disable and cleanup_bearer, > so remove the check. > > Also remove the one in tipc_udp_enable by adding "free" label. > > Signed-off-by: Xin Long <luc...@gm...> Looks good, applied. |
From: David M. <da...@da...> - 2019-07-02 22:08:11
|
From: Xin Long <luc...@gm...> Date: Tue, 2 Jul 2019 00:54:55 +0800 > For these places are protected by rcu_read_lock, we change from > rcu_dereference_rtnl to rcu_dereference, as there is no need to > check if rtnl lock is held. > > For these places are protected by rtnl_lock, we change from > rcu_dereference_rtnl to rtnl_dereference/rcu_dereference_protected, > as no extra memory barriers are needed under rtnl_lock() which also > protects tn->bearer_list[] and dev->tipc_ptr/b->media_ptr updating. > > rcu_dereference_rtnl will be only used in the places where it could > be under rcu_read_lock or rtnl_lock. > > Signed-off-by: Xin Long <luc...@gm...> In the cases where RTNL is held, even if rcu_read_lock() is also taken, we should use rtnl_dereference() because that avoids the READ_ONCE(). |
From: David M. <da...@da...> - 2019-07-02 02:12:35
|
From: Jon Maloy <jon...@er...> Date: Fri, 28 Jun 2019 17:06:20 +0200 > The macro TIPC_BC_RETR_LIM is always used in combination with 'jiffies', > so we can just as well perform the addition in the macro itself. This > way, we get a few shorter code lines and one less line break. > > Signed-off-by: Jon Maloy <jon...@er...> Applied. |
From: Xin L. <luc...@gm...> - 2019-07-01 16:57:25
|
Both tipc_udp_enable and tipc_udp_disable are called under rtnl_lock, ub->ubsock could never be NULL in tipc_udp_disable and cleanup_bearer, so remove the check. Also remove the one in tipc_udp_enable by adding "free" label. Signed-off-by: Xin Long <luc...@gm...> --- net/tipc/udp_media.c | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index 62b85db..287df687 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -759,7 +759,7 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b, err = dst_cache_init(&ub->rcast.dst_cache, GFP_ATOMIC); if (err) - goto err; + goto free; /** * The bcast media address port is used for all peers and the ip @@ -771,13 +771,14 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b, else err = tipc_udp_rcast_add(b, &remote); if (err) - goto err; + goto free; return 0; -err: + +free: dst_cache_destroy(&ub->rcast.dst_cache); - if (ub->ubsock) - udp_tunnel_sock_release(ub->ubsock); + udp_tunnel_sock_release(ub->ubsock); +err: kfree(ub); return err; } @@ -795,8 +796,7 @@ static void cleanup_bearer(struct work_struct *work) } dst_cache_destroy(&ub->rcast.dst_cache); - if (ub->ubsock) - udp_tunnel_sock_release(ub->ubsock); + udp_tunnel_sock_release(ub->ubsock); synchronize_net(); kfree(ub); } @@ -811,8 +811,7 @@ static void tipc_udp_disable(struct tipc_bearer *b) pr_err("UDP bearer instance not found\n"); return; } - if (ub->ubsock) - sock_set_flag(ub->ubsock->sk, SOCK_DEAD); + sock_set_flag(ub->ubsock->sk, SOCK_DEAD); RCU_INIT_POINTER(ub->bearer, NULL); /* sock_release need to be done outside of rtnl lock */ -- 2.1.0 |
From: Xin L. <luc...@gm...> - 2019-07-01 16:55:05
|
For these places are protected by rcu_read_lock, we change from rcu_dereference_rtnl to rcu_dereference, as there is no need to check if rtnl lock is held. For these places are protected by rtnl_lock, we change from rcu_dereference_rtnl to rtnl_dereference/rcu_dereference_protected, as no extra memory barriers are needed under rtnl_lock() which also protects tn->bearer_list[] and dev->tipc_ptr/b->media_ptr updating. rcu_dereference_rtnl will be only used in the places where it could be under rcu_read_lock or rtnl_lock. Signed-off-by: Xin Long <luc...@gm...> --- net/tipc/bearer.c | 14 +++++++------- net/tipc/udp_media.c | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 2bed658..a809c0e 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -62,7 +62,7 @@ static struct tipc_bearer *bearer_get(struct net *net, int bearer_id) { struct tipc_net *tn = tipc_net(net); - return rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + return rcu_dereference(tn->bearer_list[bearer_id]); } static void bearer_disable(struct net *net, struct tipc_bearer *b); @@ -210,7 +210,7 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest) struct tipc_bearer *b; rcu_read_lock(); - b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + b = rcu_dereference(tn->bearer_list[bearer_id]); if (b) tipc_disc_add_dest(b->disc); rcu_read_unlock(); @@ -222,7 +222,7 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest) struct tipc_bearer *b; rcu_read_lock(); - b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); + b = rcu_dereference(tn->bearer_list[bearer_id]); if (b) tipc_disc_remove_dest(b->disc); rcu_read_unlock(); @@ -444,7 +444,7 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb, struct net_device *dev; int delta; - dev = (struct net_device *)rcu_dereference_rtnl(b->media_ptr); + dev = (struct net_device *)rcu_dereference(b->media_ptr); if (!dev) return 0; @@ -481,7 +481,7 @@ int tipc_bearer_mtu(struct net *net, u32 bearer_id) struct tipc_bearer *b; rcu_read_lock(); - b = rcu_dereference_rtnl(tipc_net(net)->bearer_list[bearer_id]); + b = rcu_dereference(tipc_net(net)->bearer_list[bearer_id]); if (b) mtu = b->mtu; rcu_read_unlock(); @@ -574,8 +574,8 @@ static int tipc_l2_rcv_msg(struct sk_buff *skb, struct net_device *dev, struct tipc_bearer *b; rcu_read_lock(); - b = rcu_dereference_rtnl(dev->tipc_ptr) ?: - rcu_dereference_rtnl(orig_dev->tipc_ptr); + b = rcu_dereference(dev->tipc_ptr) ?: + rcu_dereference(orig_dev->tipc_ptr); if (likely(b && test_bit(0, &b->up) && (skb->pkt_type <= PACKET_MULTICAST))) { skb_mark_not_on_list(skb); diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index b8962df..62b85db 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -231,7 +231,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb, } skb_set_inner_protocol(skb, htons(ETH_P_TIPC)); - ub = rcu_dereference_rtnl(b->media_ptr); + ub = rcu_dereference(b->media_ptr); if (!ub) { err = -ENODEV; goto out; @@ -490,7 +490,7 @@ int tipc_udp_nl_dump_remoteip(struct sk_buff *skb, struct netlink_callback *cb) } } - ub = rcu_dereference_rtnl(b->media_ptr); + ub = rtnl_dereference(b->media_ptr); if (!ub) { rtnl_unlock(); return -EINVAL; @@ -532,7 +532,7 @@ int tipc_udp_nl_add_bearer_data(struct tipc_nl_msg *msg, struct tipc_bearer *b) struct udp_bearer *ub; struct nlattr *nest; - ub = rcu_dereference_rtnl(b->media_ptr); + ub = rtnl_dereference(b->media_ptr); if (!ub) return -ENODEV; @@ -806,7 +806,7 @@ static void tipc_udp_disable(struct tipc_bearer *b) { struct udp_bearer *ub; - ub = rcu_dereference_rtnl(b->media_ptr); + ub = rtnl_dereference(b->media_ptr); if (!ub) { pr_err("UDP bearer instance not found\n"); return; -- 2.1.0 |
From: Ying X. <yin...@wi...> - 2019-06-29 11:19:50
|
On 6/28/19 11:06 PM, Jon Maloy wrote: > The macro TIPC_BC_RETR_LIM is always used in combination with 'jiffies', > so we can just as well perform the addition in the macro itself. This > way, we get a few shorter code lines and one less line break. > > Signed-off-by: Jon Maloy <jon...@er...> Acked-by: Ying Xue <yin...@wi...> > --- > net/tipc/link.c | 9 ++++----- > 1 file changed, 4 insertions(+), 5 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c > index f8bf63b..66d3a07 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -207,7 +207,7 @@ enum { > BC_NACK_SND_SUPPRESS, > }; > > -#define TIPC_BC_RETR_LIM msecs_to_jiffies(10) /* [ms] */ > +#define TIPC_BC_RETR_LIM (jiffies + msecs_to_jiffies(10)) > #define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1)) > > /* > @@ -976,8 +976,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, > __skb_queue_tail(transmq, skb); > /* next retransmit attempt */ > if (link_is_bc_sndlink(l)) > - TIPC_SKB_CB(skb)->nxt_retr = > - jiffies + TIPC_BC_RETR_LIM; > + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; > __skb_queue_tail(xmitq, _skb); > TIPC_SKB_CB(skb)->ackers = l->ackers; > l->rcv_unacked = 0; > @@ -1027,7 +1026,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l, > __skb_queue_tail(&l->transmq, skb); > /* next retransmit attempt */ > if (link_is_bc_sndlink(l)) > - TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM; > + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; > > __skb_queue_tail(xmitq, _skb); > TIPC_SKB_CB(skb)->ackers = l->ackers; > @@ -1123,7 +1122,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, > if (link_is_bc_sndlink(l)) { > if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) > continue; > - TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM; > + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; > } > _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC); > if (!_skb) > |
From: Jon M. <jon...@er...> - 2019-06-28 15:06:28
|
The macro TIPC_BC_RETR_LIM is always used in combination with 'jiffies', so we can just as well perform the addition in the macro itself. This way, we get a few shorter code lines and one less line break. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/link.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index f8bf63b..66d3a07 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -207,7 +207,7 @@ enum { BC_NACK_SND_SUPPRESS, }; -#define TIPC_BC_RETR_LIM msecs_to_jiffies(10) /* [ms] */ +#define TIPC_BC_RETR_LIM (jiffies + msecs_to_jiffies(10)) #define TIPC_UC_RETR_TIME (jiffies + msecs_to_jiffies(1)) /* @@ -976,8 +976,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, __skb_queue_tail(transmq, skb); /* next retransmit attempt */ if (link_is_bc_sndlink(l)) - TIPC_SKB_CB(skb)->nxt_retr = - jiffies + TIPC_BC_RETR_LIM; + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; l->rcv_unacked = 0; @@ -1027,7 +1026,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l, __skb_queue_tail(&l->transmq, skb); /* next retransmit attempt */ if (link_is_bc_sndlink(l)) - TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM; + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; @@ -1123,7 +1122,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, if (link_is_bc_sndlink(l)) { if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) continue; - TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM; + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; } _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC); if (!_skb) -- 2.1.4 |
From: David M. <da...@da...> - 2019-06-28 05:37:23
|
From: Xin Long <luc...@gm...> Date: Thu, 20 Jun 2019 19:03:41 +0800 > As other udp/ip tunnels do, tipc udp media should also have a > lockless dst_cache supported on its tx path. > > Here we add dst_cache into udp_replicast to support dst cache > for both rmcast and rcast, and rmcast uses ub->rcast and each > rcast uses its own node in ub->rcast.list. > > Signed-off-by: Xin Long <luc...@gm...> Applied to net-next. |
From: David M. <da...@da...> - 2019-06-25 20:43:45
|
From: Jon Maloy <jon...@er...> Date: Tue, 25 Jun 2019 19:37:00 +0200 > We rename the inline function msg_get_wrapped() to the more > comprehensible msg_inner_hdr(). > > Signed-off-by: Jon Maloy <jon...@er...> Applied, thanks Jon. |
From: David M. <da...@da...> - 2019-06-25 20:40:40
|
From: Jon Maloy <jon...@er...> Date: Tue, 25 Jun 2019 18:08:13 +0200 > We increase the allocated headroom for the buffer copies to be > retransmitted. This eliminates the need for the lower stack levels > (UDP/IP/L2) to expand the headroom in order to add their own headers. > > Signed-off-by: Jon Maloy <jon...@er...> Applied. |
From: David M. <da...@da...> - 2019-06-25 20:34:18
|
From: Jon Maloy <jon...@er...> Date: Tue, 25 Jun 2019 17:36:43 +0200 > In commit a4dc70d46cf1 ("tipc: extend link reset criteria for stale > packet retransmission") we made link retransmission failure events > dependent on the link tolerance, and not only of the number of failed > retransmission attempts, as we did earlier. This works well. However, > keeping the original, additional criteria of 99 failed retransmissions > is now redundant, and may in some cases lead to failure detection > times in the order of minutes instead of the expected 1.5 sec link > tolerance value. > > We now remove this criteria altogether. > > Acked-by: Ying Xue <yin...@wi...> > Signed-off-by: Jon Maloy <jon...@er...> Applied. |
From: Jon M. <jon...@er...> - 2019-06-25 17:37:06
|
We rename the inline function msg_get_wrapped() to the more comprehensible msg_inner_hdr(). Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/bcast.c | 4 ++-- net/tipc/link.c | 2 +- net/tipc/msg.h | 4 ++-- net/tipc/node.c | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 6c997d4..1336f3c 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -323,7 +323,7 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb, hdr = buf_msg(skb); if (msg_user(hdr) == MSG_FRAGMENTER) - hdr = msg_get_wrapped(hdr); + hdr = msg_inner_hdr(hdr); if (msg_type(hdr) != TIPC_MCAST_MSG) return 0; @@ -392,7 +392,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, skb = skb_peek(pkts); hdr = buf_msg(skb); if (msg_user(hdr) == MSG_FRAGMENTER) - hdr = msg_get_wrapped(hdr); + hdr = msg_inner_hdr(hdr); msg_set_is_rcast(hdr, method->rcast); /* Switch method ? */ diff --git a/net/tipc/link.c b/net/tipc/link.c index aa79bf8..f8bf63b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -732,7 +732,7 @@ static void link_profile_stats(struct tipc_link *l) if (msg_user(msg) == MSG_FRAGMENTER) { if (msg_type(msg) != FIRST_FRAGMENT) return; - length = msg_size(msg_get_wrapped(msg)); + length = msg_size(msg_inner_hdr(msg)); } l->stats.msg_lengths_total += length; l->stats.msg_length_counts++; diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 8de02ad..da509f0 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -308,7 +308,7 @@ static inline unchar *msg_data(struct tipc_msg *m) return ((unchar *)m) + msg_hdr_sz(m); } -static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m) +static inline struct tipc_msg *msg_inner_hdr(struct tipc_msg *m) { return (struct tipc_msg *)msg_data(m); } @@ -486,7 +486,7 @@ static inline void msg_set_prevnode(struct tipc_msg *m, u32 a) static inline u32 msg_origport(struct tipc_msg *m) { if (msg_user(m) == MSG_FRAGMENTER) - m = msg_get_wrapped(m); + m = msg_inner_hdr(m); return msg_word(m, 4); } diff --git a/net/tipc/node.c b/net/tipc/node.c index 550581d..324a1f9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1649,7 +1649,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, int usr = msg_user(hdr); int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); + u16 iseqno = msg_seqno(msg_inner_hdr(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; int state = n->state; -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-06-25 16:08:13
|
We increase the allocated headroom for the buffer copies to be retransmitted. This eliminates the need for the lower stack levels (UDP/IP/L2) to expand the headroom in order to add their own headers. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/link.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index af50b53..aa79bf8 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1125,7 +1125,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, continue; TIPC_SKB_CB(skb)->nxt_retr = jiffies + TIPC_BC_RETR_LIM; } - _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); + _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC); if (!_skb) return 0; hdr = buf_msg(_skb); -- 2.1.4 |