You can subscribe to this list here.
2003 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
(6) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2004 |
Jan
(9) |
Feb
(11) |
Mar
(22) |
Apr
(73) |
May
(78) |
Jun
(146) |
Jul
(80) |
Aug
(27) |
Sep
(5) |
Oct
(14) |
Nov
(18) |
Dec
(27) |
2005 |
Jan
(20) |
Feb
(30) |
Mar
(19) |
Apr
(28) |
May
(50) |
Jun
(31) |
Jul
(32) |
Aug
(14) |
Sep
(36) |
Oct
(43) |
Nov
(74) |
Dec
(63) |
2006 |
Jan
(34) |
Feb
(32) |
Mar
(21) |
Apr
(76) |
May
(106) |
Jun
(72) |
Jul
(70) |
Aug
(175) |
Sep
(130) |
Oct
(39) |
Nov
(81) |
Dec
(43) |
2007 |
Jan
(81) |
Feb
(36) |
Mar
(20) |
Apr
(43) |
May
(54) |
Jun
(34) |
Jul
(44) |
Aug
(55) |
Sep
(44) |
Oct
(54) |
Nov
(43) |
Dec
(41) |
2008 |
Jan
(42) |
Feb
(84) |
Mar
(73) |
Apr
(30) |
May
(119) |
Jun
(54) |
Jul
(54) |
Aug
(93) |
Sep
(173) |
Oct
(130) |
Nov
(145) |
Dec
(153) |
2009 |
Jan
(59) |
Feb
(12) |
Mar
(28) |
Apr
(18) |
May
(56) |
Jun
(9) |
Jul
(28) |
Aug
(62) |
Sep
(16) |
Oct
(19) |
Nov
(15) |
Dec
(17) |
2010 |
Jan
(14) |
Feb
(36) |
Mar
(37) |
Apr
(30) |
May
(33) |
Jun
(53) |
Jul
(42) |
Aug
(50) |
Sep
(67) |
Oct
(66) |
Nov
(69) |
Dec
(36) |
2011 |
Jan
(52) |
Feb
(45) |
Mar
(49) |
Apr
(21) |
May
(34) |
Jun
(13) |
Jul
(19) |
Aug
(37) |
Sep
(43) |
Oct
(10) |
Nov
(23) |
Dec
(30) |
2012 |
Jan
(42) |
Feb
(36) |
Mar
(46) |
Apr
(25) |
May
(96) |
Jun
(146) |
Jul
(40) |
Aug
(28) |
Sep
(61) |
Oct
(45) |
Nov
(100) |
Dec
(53) |
2013 |
Jan
(79) |
Feb
(24) |
Mar
(134) |
Apr
(156) |
May
(118) |
Jun
(75) |
Jul
(278) |
Aug
(145) |
Sep
(136) |
Oct
(168) |
Nov
(137) |
Dec
(439) |
2014 |
Jan
(284) |
Feb
(158) |
Mar
(231) |
Apr
(275) |
May
(259) |
Jun
(91) |
Jul
(222) |
Aug
(215) |
Sep
(165) |
Oct
(166) |
Nov
(211) |
Dec
(150) |
2015 |
Jan
(164) |
Feb
(324) |
Mar
(299) |
Apr
(214) |
May
(111) |
Jun
(109) |
Jul
(105) |
Aug
(36) |
Sep
(58) |
Oct
(131) |
Nov
(68) |
Dec
(30) |
2016 |
Jan
(46) |
Feb
(87) |
Mar
(135) |
Apr
(174) |
May
(132) |
Jun
(135) |
Jul
(149) |
Aug
(125) |
Sep
(79) |
Oct
(49) |
Nov
(95) |
Dec
(102) |
2017 |
Jan
(104) |
Feb
(75) |
Mar
(72) |
Apr
(53) |
May
(18) |
Jun
(5) |
Jul
(14) |
Aug
(19) |
Sep
(2) |
Oct
(13) |
Nov
(21) |
Dec
(67) |
2018 |
Jan
(56) |
Feb
(50) |
Mar
(148) |
Apr
(41) |
May
(37) |
Jun
(34) |
Jul
(34) |
Aug
(11) |
Sep
(52) |
Oct
(48) |
Nov
(28) |
Dec
(46) |
2019 |
Jan
(29) |
Feb
(63) |
Mar
(95) |
Apr
(54) |
May
(14) |
Jun
(71) |
Jul
(60) |
Aug
(49) |
Sep
(3) |
Oct
(64) |
Nov
(115) |
Dec
(57) |
2020 |
Jan
(15) |
Feb
(9) |
Mar
(38) |
Apr
(27) |
May
(60) |
Jun
(53) |
Jul
(35) |
Aug
(46) |
Sep
(37) |
Oct
(64) |
Nov
(20) |
Dec
(25) |
2021 |
Jan
(20) |
Feb
(31) |
Mar
(27) |
Apr
(23) |
May
(21) |
Jun
(30) |
Jul
(30) |
Aug
(7) |
Sep
(18) |
Oct
|
Nov
(15) |
Dec
(4) |
2022 |
Jan
(3) |
Feb
(1) |
Mar
(10) |
Apr
|
May
(2) |
Jun
(26) |
Jul
(5) |
Aug
|
Sep
(1) |
Oct
(2) |
Nov
(9) |
Dec
(2) |
2023 |
Jan
(4) |
Feb
(4) |
Mar
(5) |
Apr
(10) |
May
(29) |
Jun
(17) |
Jul
|
Aug
|
Sep
(1) |
Oct
(1) |
Nov
(2) |
Dec
|
2024 |
Jan
|
Feb
(6) |
Mar
|
Apr
(1) |
May
(6) |
Jun
|
Jul
(5) |
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2025 |
Jan
|
Feb
(3) |
Mar
|
Apr
|
May
|
Jun
|
Jul
(6) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: Tuong L. <tuo...@de...> - 2020-05-29 07:48:40
|
Hi Jon, all, I can see we had two commits from 'Xiyu Yang <xiy...@fu...>' without your 'Acked-by', not sure why... They have caused kernel panics and the TIPC encryption cannot work correctly. Therefore, so I'd revert them. Please help check. Thanks a lot! BR/Tuong Tuong Lien (2): Revert "tipc: Fix potential tipc_node refcnt leak in tipc_rcv" Revert "tipc: Fix potential tipc_aead refcnt leak in tipc_crypto_rcv" net/tipc/crypto.c | 1 - net/tipc/node.c | 1 - 2 files changed, 2 deletions(-) -- 2.13.7 |
From: Tuong T. L. <tuo...@de...> - 2020-05-29 02:16:46
|
Hi Yue, Thanks for your patch! Yesterday, I also sent a patch to "tipc-discussion" to solve this issue... Actually, the 'tipc_msg_append()' needs to be fixed instead. In fact, we never expect the 'txq' to be empty, otherwise the crash will still happen but at another place. BR/Tuong -----Original Message----- From: YueHaibing <yue...@hu...> Sent: Thursday, May 28, 2020 9:34 PM To: jm...@re...; yin...@wi...; da...@da...; ku...@ke...; Tuong Tong Lien <tuo...@de...> Cc: ne...@vg...; tip...@li...; lin...@vg...; YueHaibing <yue...@hu...> Subject: [PATCH net-next] tipc: Fix NULL pointer dereference in __tipc_sendstream() tipc_sendstream() may send zero length packet, then tipc_msg_append() do not alloc skb, skb_peek_tail() will get NULL, msg_set_ack_required will trigger NULL pointer dereference. Reported-by: syz...@sy...<mailto:syz...@sy...> Fixes: 0a3e060f340d ("tipc: add test for Nagle algorithm effectiveness") Signed-off-by: YueHaibing <yue...@hu...<mailto:yue...@hu...>> --- net/tipc/socket.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index d6b67d07d22e..2943561399f1 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1588,8 +1588,12 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) tsk->pkt_cnt += skb_queue_len(txq); } else { skb = skb_peek_tail(txq); - msg_set_ack_required(buf_msg(skb)); - tsk->expect_ack = true; + if (skb) { + msg_set_ack_required(buf_msg(skb)); + tsk->expect_ack = true; + } else { + tsk->expect_ack = false; + } tsk->msg_acc = 0; tsk->pkt_cnt = 0; } -- 2.17.1 |
From: Tuong L. <tuo...@de...> - 2020-05-28 11:35:42
|
syzbot found the following crash: general protection fault, probably for non-canonical address 0xdffffc0000000019: 0000 [#1] PREEMPT SMP KASAN KASAN: null-ptr-deref in range [0x00000000000000c8-0x00000000000000cf] CPU: 1 PID: 7060 Comm: syz-executor394 Not tainted 5.7.0-rc6-syzkaller #0 Hardware name: Google Google Compute Engine/Google Compute Engine, BIOS Google 01/01/2011 RIP: 0010:__tipc_sendstream+0xbde/0x11f0 net/tipc/socket.c:1591 Code: 00 00 00 00 48 39 5c 24 28 48 0f 44 d8 e8 fa 3e db f9 48 b8 00 00 00 00 00 fc ff df 48 8d bb c8 00 00 00 48 89 fa 48 c1 ea 03 <80> 3c 02 00 0f 85 e2 04 00 00 48 8b 9b c8 00 00 00 48 b8 00 00 00 RSP: 0018:ffffc90003ef7818 EFLAGS: 00010202 RAX: dffffc0000000000 RBX: 0000000000000000 RCX: ffffffff8797fd9d RDX: 0000000000000019 RSI: ffffffff8797fde6 RDI: 00000000000000c8 RBP: ffff888099848040 R08: ffff88809a5f6440 R09: fffffbfff1860b4c R10: ffffffff8c305a5f R11: fffffbfff1860b4b R12: ffff88809984857e R13: 0000000000000000 R14: ffff888086aa4000 R15: 0000000000000000 FS: 00000000009b4880(0000) GS:ffff8880ae700000(0000) knlGS:0000000000000000 CS: 0010 DS: 0000 ES: 0000 CR0: 0000000080050033 CR2: 0000000020000140 CR3: 00000000a7fdf000 CR4: 00000000001406e0 DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000 DR3: 0000000000000000 DR6: 00000000fffe0ff0 DR7: 0000000000000400 Call Trace: tipc_sendstream+0x4c/0x70 net/tipc/socket.c:1533 sock_sendmsg_nosec net/socket.c:652 [inline] sock_sendmsg+0xcf/0x120 net/socket.c:672 ____sys_sendmsg+0x32f/0x810 net/socket.c:2352 ___sys_sendmsg+0x100/0x170 net/socket.c:2406 __sys_sendmmsg+0x195/0x480 net/socket.c:2496 __do_sys_sendmmsg net/socket.c:2525 [inline] __se_sys_sendmmsg net/socket.c:2522 [inline] __x64_sys_sendmmsg+0x99/0x100 net/socket.c:2522 do_syscall_64+0xf6/0x7d0 arch/x86/entry/common.c:295 entry_SYSCALL_64_after_hwframe+0x49/0xb3 RIP: 0033:0x440199 ... This bug was bisected to commit 0a3e060f340d ("tipc: add test for Nagle algorithm effectiveness"). However, it is not the case, the trouble was in base that we can make an empty 'txq' queue after the 'tipc_msg_append()' in Nagle mode, especially in the case, syzbot tried to send messages of zero data length! The same crash can be reproduced even without the patch but at the link layer when it accesses the empty queue. The commit solves the issue by building at least one skb to go with the socket header and optional data section that may be empty like what we had with the 'tipc_msg_build()'. Reported-by: syz...@sy... Fixes: c0bceb97db9e ("tipc: add smart nagle feature") Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/msg.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 23809039dda1..604c03adabc2 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -221,7 +221,7 @@ int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen, accounted = skb ? msg_blocks(buf_msg(skb)) : 0; total = accounted; - while (rem) { + do { if (!skb || skb->len >= mss) { prev = skb; skb = tipc_buf_acquire(mss, GFP_KERNEL); @@ -246,7 +246,7 @@ int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen, skb_put(skb, cpy); rem -= cpy; total += msg_blocks(hdr) - curr; - } + } while (rem); return total - accounted; } -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-27 03:12:54
|
Since commit c0bceb97db9e ("tipc: add smart nagle feature"), the buffer queue to be transmitted may contain multiple skbs, which have been hold by Nagle. If so, at the 'tipc_link_xmit()', accumulating the TX fragmented or fragment messages' statistic based on the queue length is no longer accurate. This commit resolves the issue by explicitly checking whether the queue contains a 'MSG_FRAGMENTER' message instead. Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index ee3b8d0576b8..4c073ddc7466 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -992,7 +992,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; - int pkt_cnt = skb_queue_len(list); int imp = msg_importance(hdr); unsigned int mss = tipc_link_mss(l); unsigned int cwin = l->window; @@ -1017,9 +1016,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, rc = link_schedule_user(l, hdr); } - if (pkt_cnt > 1) { + if (msg_user(hdr) == MSG_FRAGMENTER) { l->stats.sent_fragmented++; - l->stats.sent_fragments += pkt_cnt; + l->stats.sent_fragments += skb_queue_len(list); } /* Prepare each packet for sending, and add to relevant queue: */ -- 2.13.7 |
From: David M. <da...@da...> - 2020-05-26 22:17:17
|
From: Tuong Lien <tuo...@de...> Date: Tue, 26 May 2020 16:38:33 +0700 > This series adds some improvements to TIPC. > > The first patch improves the TIPC broadcast's performance with the 'Gap > ACK blocks' mechanism similar to unicast before, while the others give > support on tracing & statistics for broadcast links, and an alternative > to carry broadcast retransmissions via unicast which might be useful in > some cases. > > Besides, the Nagle algorithm can now automatically 'adjust' itself > depending on the specific network condition a stream connection runs by > the last patch. Series applied, thanks. |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:41:17
|
This commit allows printing the statistics of a broadcast-receiver link using the same tipc command, but with additional 'link' options: $ tipc link stat show --help Usage: tipc link stat show [ link { LINK | SUBSTRING | all } ] With: + 'LINK' : print the stats of the specific link 'LINK'; + 'SUBSTRING' : print the stats of all the links having the 'SUBSTRING' in name; + 'all' : print all the links' stats incl. the broadcast-receiver ones; Also, a link stats can be reset in the usual way by specifying the link name in command. For example: $ tipc l st sh l br Link <broadcast-link> Window:50 packets RX packets:0 fragments:0/0 bundles:0/0 TX packets:5011125 fragments:4968774/149643 bundles:38402/307061 RX naks:781484 defs:0 dups:0 TX naks:0 acks:0 retrans:330259 Congestion link:50657 Send queue max:0 avg:0 Link <broadcast-link:1001001> Window:50 packets RX packets:95146 fragments:95040/1980 bundles:1/2 TX packets:0 fragments:0/0 bundles:0/0 RX naks:380938 defs:83962 dups:403 TX naks:8362 acks:0 retrans:170662 Congestion link:0 Send queue max:0 avg:0 Link <broadcast-link:1001002> Window:50 packets RX packets:0 fragments:0/0 bundles:0/0 TX packets:0 fragments:0/0 bundles:0/0 RX naks:400546 defs:0 dups:0 TX naks:0 acks:0 retrans:159597 Congestion link:0 Send queue max:0 avg:0 $ tipc l st sh l 1001002 Link <1001003:data0-1001002:data0> ACTIVE MTU:1500 Priority:10 Tolerance:1500 ms Window:50 packets RX packets:99546 fragments:0/0 bundles:33/877 TX packets:629 fragments:0/0 bundles:35/828 TX profile sample:8 packets average:390 octets 0-64:75% -256:0% -1024:0% -4096:25% -16384:0% -32768:0% -66000:0% RX states:488714 probes:7397 naks:0 defs:4 dups:5 TX states:27734 probes:18016 naks:5 acks:2305 retrans:0 Congestion link:0 Send queue max:0 avg:0 Link <broadcast-link:1001002> Window:50 packets RX packets:0 fragments:0/0 bundles:0/0 TX packets:0 fragments:0/0 bundles:0/0 RX naks:400546 defs:0 dups:0 TX naks:0 acks:0 retrans:159597 Congestion link:0 Send queue max:0 avg:0 $ tipc l st re l broadcast-link:1001002 $ tipc l st sh l broadcast-link:1001002 Link <broadcast-link:1001002> Window:50 packets RX packets:0 fragments:0/0 bundles:0/0 TX packets:0 fragments:0/0 bundles:0/0 RX naks:0 defs:0 dups:0 TX naks:0 acks:0 retrans:0 Congestion link:0 Send queue max:0 avg:0 Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- tipc/link.c | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/tipc/link.c b/tipc/link.c index e123c186..ba77a201 100644 --- a/tipc/link.c +++ b/tipc/link.c @@ -334,7 +334,7 @@ static int _show_link_stat(const char *name, struct nlattr *attrs[], open_json_object(NULL); - print_string(PRINT_ANY, "link", "\nLink <%s>\n", name); + print_string(PRINT_ANY, "link", "Link <%s>\n", name); print_string(PRINT_JSON, "state", "", NULL); open_json_array(PRINT_JSON, NULL); if (attrs[TIPC_NLA_LINK_ACTIVE]) @@ -433,7 +433,7 @@ static int _show_link_stat(const char *name, struct nlattr *attrs[], mnl_attr_get_u32(stats[TIPC_NLA_STATS_LINK_CONGS])); print_uint(PRINT_ANY, "send queue max", " Send queue max:%u", mnl_attr_get_u32(stats[TIPC_NLA_STATS_MAX_QUEUE])); - print_uint(PRINT_ANY, "avg", " avg:%u\n", + print_uint(PRINT_ANY, "avg", " avg:%u\n\n", mnl_attr_get_u32(stats[TIPC_NLA_STATS_AVG_QUEUE])); close_json_object(); @@ -496,7 +496,7 @@ static int _show_bc_link_stat(const char *name, struct nlattr *prop[], mnl_attr_get_u32(stats[TIPC_NLA_STATS_LINK_CONGS])); print_uint(PRINT_ANY, "send queue max", " Send queue max:%u", mnl_attr_get_u32(stats[TIPC_NLA_STATS_MAX_QUEUE])); - print_uint(PRINT_ANY, "avg", " avg:%u\n", + print_uint(PRINT_ANY, "avg", " avg:%u\n\n", mnl_attr_get_u32(stats[TIPC_NLA_STATS_AVG_QUEUE])); close_json_object(); @@ -527,8 +527,10 @@ static int link_stat_show_cb(const struct nlmsghdr *nlh, void *data) name = mnl_attr_get_str(attrs[TIPC_NLA_LINK_NAME]); - /* If a link is passed, skip all but that link */ - if (link && (strcmp(name, link) != 0)) + /* If a link is passed, skip all but that link. + * Support a substring matching as well. + */ + if (link && !strstr(name, link)) return MNL_CB_OK; if (attrs[TIPC_NLA_LINK_BROADCAST]) { @@ -540,7 +542,7 @@ static int link_stat_show_cb(const struct nlmsghdr *nlh, void *data) static void cmd_link_stat_show_help(struct cmdl *cmdl) { - fprintf(stderr, "Usage: %s link stat show [ link LINK ]\n", + fprintf(stderr, "Usage: %s link stat show [ link { LINK | SUBSTRING | all } ]\n", cmdl->argv[0]); } @@ -554,6 +556,7 @@ static int cmd_link_stat_show(struct nlmsghdr *nlh, const struct cmd *cmd, { "link", OPT_KEYVAL, NULL }, { NULL } }; + struct nlattr *attrs; int err = 0; if (help_flag) { @@ -571,8 +574,14 @@ static int cmd_link_stat_show(struct nlmsghdr *nlh, const struct cmd *cmd, return -EINVAL; opt = get_opt(opts, "link"); - if (opt) - link = opt->val; + if (opt) { + if (strcmp(opt->val, "all")) + link = opt->val; + /* Set the flag to dump all bc links */ + attrs = mnl_attr_nest_start(nlh, TIPC_NLA_LINK); + mnl_attr_put(nlh, TIPC_NLA_LINK_BROADCAST, 0, NULL); + mnl_attr_nest_end(nlh, attrs); + } new_json_obj(json); err = msg_dumpit(nlh, link_stat_show_cb, link); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:39:12
|
When streaming in Nagle mode, we try to bundle small messages from user as many as possible if there is one outstanding buffer, i.e. not ACK-ed by the receiving side, which helps boost up the overall throughput. So, the algorithm's effectiveness really depends on when Nagle ACK comes or what the specific network latency (RTT) is, compared to the user's message sending rate. In a bad case, the user's sending rate is low or the network latency is small, there will not be many bundles, so making a Nagle ACK or waiting for it is not meaningful. For example: a user sends its messages every 100ms and the RTT is 50ms, then for each messages, we require one Nagle ACK but then there is only one user message sent without any bundles. In a better case, even if we have a few bundles (e.g. the RTT = 300ms), but now the user sends messages in medium size, then there will not be any difference at all, that says 3 x 1000-byte data messages if bundled will still result in 3 bundles with MTU = 1500. When Nagle is ineffective, the delay in user message sending is clearly wasted instead of sending directly. Besides, adding Nagle ACKs will consume some processor load on both the sending and receiving sides. This commit adds a test on the effectiveness of the Nagle algorithm for an individual connection in the network on which it actually runs. Particularly, upon receipt of a Nagle ACK we will compare the number of bundles in the backlog queue to the number of user messages which would be sent directly without Nagle. If the ratio is good (e.g. >= 2), Nagle mode will be kept for further message sending. Otherwise, we will leave Nagle and put a 'penalty' on the connection, so it will have to spend more 'one-way' messages before being able to re-enter Nagle. In addition, the 'ack-required' bit is only set when really needed that the number of Nagle ACKs will be reduced during Nagle mode. Testing with benchmark showed that with the patch, there was not much difference in throughput for small messages since the tool continuously sends messages without a break, so Nagle would still take in effect. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/msg.c | 3 --- net/tipc/msg.h | 14 ++++++++++-- net/tipc/socket.c | 64 ++++++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 17 deletions(-) diff --git a/net/tipc/msg.c b/net/tipc/msg.c index c69fb99163fc..23809039dda1 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -235,9 +235,6 @@ int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen, msg_set_size(hdr, MIN_H_SIZE); __skb_queue_tail(txq, skb); total += 1; - if (prev) - msg_set_ack_required(buf_msg(prev), 0); - msg_set_ack_required(hdr, 1); } hdr = buf_msg(skb); curr = msg_blocks(hdr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index cd4281779468..58660d56bc83 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -340,9 +340,19 @@ static inline int msg_ack_required(struct tipc_msg *m) return msg_bits(m, 0, 18, 1); } -static inline void msg_set_ack_required(struct tipc_msg *m, u32 d) +static inline void msg_set_ack_required(struct tipc_msg *m) { - msg_set_bits(m, 0, 18, 1, d); + msg_set_bits(m, 0, 18, 1, 1); +} + +static inline int msg_nagle_ack(struct tipc_msg *m) +{ + return msg_bits(m, 0, 18, 1); +} + +static inline void msg_set_nagle_ack(struct tipc_msg *m) +{ + msg_set_bits(m, 0, 18, 1, 1); } static inline bool msg_is_rcast(struct tipc_msg *m) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index e370ad0edd76..d6b67d07d22e 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -48,6 +48,8 @@ #include "group.h" #include "trace.h" +#define NAGLE_START_INIT 4 +#define NAGLE_START_MAX 1024 #define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ #define CONN_PROBING_INTV msecs_to_jiffies(3600000) /* [ms] => 1 h */ #define TIPC_FWD_MSG 1 @@ -119,7 +121,10 @@ struct tipc_sock { struct rcu_head rcu; struct tipc_group *group; u32 oneway; + u32 nagle_start; u16 snd_backlog; + u16 msg_acc; + u16 pkt_cnt; bool expect_ack; bool nodelay; bool group_is_open; @@ -143,7 +148,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk); static void tipc_sk_remove(struct tipc_sock *tsk); static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz); static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); -static void tipc_sk_push_backlog(struct tipc_sock *tsk); +static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack); static const struct proto_ops packet_ops; static const struct proto_ops stream_ops; @@ -474,6 +479,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk = tipc_sk(sk); tsk->max_pkt = MAX_PKT_DEFAULT; tsk->maxnagle = 0; + tsk->nagle_start = NAGLE_START_INIT; INIT_LIST_HEAD(&tsk->publications); INIT_LIST_HEAD(&tsk->cong_links); msg = &tsk->phdr; @@ -541,7 +547,7 @@ static void __tipc_shutdown(struct socket *sock, int error) !tsk_conn_cong(tsk))); /* Push out delayed messages if in Nagle mode */ - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, false); /* Remove pending SYN */ __skb_queue_purge(&sk->sk_write_queue); @@ -1252,14 +1258,37 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, /* tipc_sk_push_backlog(): send accumulated buffers in socket write queue * when socket is in Nagle mode */ -static void tipc_sk_push_backlog(struct tipc_sock *tsk) +static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack) { struct sk_buff_head *txq = &tsk->sk.sk_write_queue; + struct sk_buff *skb = skb_peek_tail(txq); struct net *net = sock_net(&tsk->sk); u32 dnode = tsk_peer_node(tsk); - struct sk_buff *skb = skb_peek(txq); int rc; + if (nagle_ack) { + tsk->pkt_cnt += skb_queue_len(txq); + if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) { + tsk->oneway = 0; + if (tsk->nagle_start < NAGLE_START_MAX) + tsk->nagle_start *= 2; + tsk->expect_ack = false; + pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n", + tsk->portid, tsk->msg_acc, tsk->pkt_cnt, + tsk->nagle_start); + } else { + tsk->nagle_start = NAGLE_START_INIT; + if (skb) { + msg_set_ack_required(buf_msg(skb)); + tsk->expect_ack = true; + } else { + tsk->expect_ack = false; + } + } + tsk->msg_acc = 0; + tsk->pkt_cnt = 0; + } + if (!skb || tsk->cong_link_cnt) return; @@ -1267,9 +1296,10 @@ static void tipc_sk_push_backlog(struct tipc_sock *tsk) if (msg_is_syn(buf_msg(skb))) return; + if (tsk->msg_acc) + tsk->pkt_cnt += skb_queue_len(txq); tsk->snt_unacked += tsk->snd_backlog; tsk->snd_backlog = 0; - tsk->expect_ack = true; rc = tipc_node_xmit(net, txq, dnode, tsk->portid); if (rc == -ELINKCONG) tsk->cong_link_cnt = 1; @@ -1322,8 +1352,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, return; } else if (mtyp == CONN_ACK) { was_cong = tsk_conn_cong(tsk); - tsk->expect_ack = false; - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr)); tsk->snt_unacked -= msg_conn_ack(hdr); if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) tsk->snd_win = msg_adv_win(hdr); @@ -1516,6 +1545,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) struct tipc_sock *tsk = tipc_sk(sk); struct tipc_msg *hdr = &tsk->phdr; struct net *net = sock_net(sk); + struct sk_buff *skb; u32 dnode = tsk_peer_node(tsk); int maxnagle = tsk->maxnagle; int maxpkt = tsk->max_pkt; @@ -1544,17 +1574,25 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) break; send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); blocks = tsk->snd_backlog; - if (tsk->oneway++ >= 4 && send <= maxnagle) { + if (tsk->oneway++ >= tsk->nagle_start && send <= maxnagle) { rc = tipc_msg_append(hdr, m, send, maxnagle, txq); if (unlikely(rc < 0)) break; blocks += rc; + tsk->msg_acc++; if (blocks <= 64 && tsk->expect_ack) { tsk->snd_backlog = blocks; sent += send; break; + } else if (blocks > 64) { + tsk->pkt_cnt += skb_queue_len(txq); + } else { + skb = skb_peek_tail(txq); + msg_set_ack_required(buf_msg(skb)); + tsk->expect_ack = true; + tsk->msg_acc = 0; + tsk->pkt_cnt = 0; } - tsk->expect_ack = true; } else { rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq); if (unlikely(rc != send)) @@ -2091,7 +2129,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, smp_wmb(); tsk->cong_link_cnt--; wakeup = true; - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, false); break; case GROUP_PROTOCOL: tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); @@ -2180,7 +2218,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb, return false; case TIPC_ESTABLISHED: if (!skb_queue_empty(&sk->sk_write_queue)) - tipc_sk_push_backlog(tsk); + tipc_sk_push_backlog(tsk, false); /* Accept only connection-based messages sent by peer */ if (likely(con_msg && !err && pport == oport && pnode == onode)) { @@ -2188,8 +2226,10 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb, struct sk_buff *skb; skb = tipc_sk_build_ack(tsk); - if (skb) + if (skb) { + msg_set_nagle_ack(buf_msg(skb)); __skb_queue_tail(xmitq, skb); + } } return true; } -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:39:11
|
As achieved through commit 9195948fbf34 ("tipc: improve TIPC throughput by Gap ACK blocks"), we apply the same mechanism for the broadcast link as well. The 'Gap ACK blocks' data field in a 'PROTOCOL/STATE_MSG' will consist of two parts built for both the broadcast and unicast types: 31 16 15 0 +-------------+-------------+-------------+-------------+ | bgack_cnt | ugack_cnt | len | +-------------+-------------+-------------+-------------+ - | gap | ack | | +-------------+-------------+-------------+-------------+ > bc gacks : : : | +-------------+-------------+-------------+-------------+ - | gap | ack | | +-------------+-------------+-------------+-------------+ > uc gacks : : : | +-------------+-------------+-------------+-------------+ - which is "automatically" backward-compatible. We also increase the max number of Gap ACK blocks to 128, allowing upto 64 blocks per type (total buffer size = 516 bytes). Besides, the 'tipc_link_advance_transmq()' function is refactored which is applicable for both the unicast and broadcast cases now, so some old functions can be removed and the code is optimized. With the patch, TIPC broadcast is more robust regardless of packet loss or disorder, latency, ... in the underlying network. Its performance is boost up significantly. For example, experiment with a 5% packet loss rate results: $ time tipc-pipe --mc --rdm --data_size 123 --data_num 1500000 real 0m 42.46s user 0m 1.16s sys 0m 17.67s Without the patch: $ time tipc-pipe --mc --rdm --data_size 123 --data_num 1500000 real 8m 27.94s user 0m 0.55s sys 0m 2.38s Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/bcast.c | 9 +- net/tipc/link.c | 425 ++++++++++++++++++++++++++++++++----------------------- net/tipc/link.h | 7 +- net/tipc/msg.h | 27 +++- net/tipc/node.c | 10 +- 5 files changed, 293 insertions(+), 185 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 4c20be08b9c4..3ce690a96ee9 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -474,7 +474,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, __skb_queue_head_init(&xmitq); tipc_bcast_lock(net); - tipc_link_bc_ack_rcv(l, acked, &xmitq); + tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq); tipc_bcast_unlock(net); tipc_bcbase_xmit(net, &xmitq); @@ -492,6 +492,7 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr) { struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; + struct tipc_gap_ack_blks *ga; struct sk_buff_head xmitq; int rc = 0; @@ -501,8 +502,10 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, if (msg_type(hdr) != STATE_MSG) { tipc_link_bc_init_rcv(l, hdr); } else if (!msg_bc_ack_invalid(hdr)) { - tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq); - rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq); + tipc_get_gap_ack_blks(&ga, l, hdr, false); + rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), + msg_bc_gap(hdr), ga, &xmitq); + rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq); } tipc_bcast_unlock(net); diff --git a/net/tipc/link.c b/net/tipc/link.c index d4675e922a8f..d29b9c531171 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -188,6 +188,8 @@ struct tipc_link { /* Broadcast */ u16 ackers; u16 acked; + u16 last_gap; + struct tipc_gap_ack_blks *last_ga; struct tipc_link *bc_rcvlink; struct tipc_link *bc_sndlink; u8 nack_state; @@ -249,11 +251,14 @@ static int tipc_link_build_nack_msg(struct tipc_link *l, struct sk_buff_head *xmitq); static void tipc_link_build_bc_init_msg(struct tipc_link *l, struct sk_buff_head *xmitq); -static int tipc_link_release_pkts(struct tipc_link *l, u16 to); -static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap); -static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, +static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga, + struct tipc_link *l, u8 start_index); +static u16 tipc_build_gap_ack_blks(struct tipc_link *l, struct tipc_msg *hdr); +static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r, + u16 acked, u16 gap, struct tipc_gap_ack_blks *ga, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, + bool *retransmitted, int *rc); static void tipc_link_update_cwin(struct tipc_link *l, int released, bool retransmitted); /* @@ -370,7 +375,7 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l, snd_l->ackers--; rcv_l->bc_peer_is_up = true; rcv_l->state = LINK_ESTABLISHED; - tipc_link_bc_ack_rcv(rcv_l, ack, xmitq); + tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq); trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); tipc_link_reset(rcv_l); rcv_l->state = LINK_RESET; @@ -784,8 +789,6 @@ bool tipc_link_too_silent(struct tipc_link *l) return (l->silent_intv_cnt + 2 > l->abort_limit); } -static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, - u16 from, u16 to, struct sk_buff_head *xmitq); /* tipc_link_timeout - perform periodic task as instructed from node timeout */ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) @@ -948,6 +951,9 @@ void tipc_link_reset(struct tipc_link *l) l->snd_nxt_state = 1; l->rcv_nxt_state = 1; l->acked = 0; + l->last_gap = 0; + kfree(l->last_ga); + l->last_ga = NULL; l->silent_intv_cnt = 0; l->rst_cnt = 0; l->bc_peer_is_up = false; @@ -1183,68 +1189,14 @@ static bool link_retransmit_failure(struct tipc_link *l, struct tipc_link *r, if (link_is_bc_sndlink(l)) { r->state = LINK_RESET; - *rc = TIPC_LINK_DOWN_EVT; + *rc |= TIPC_LINK_DOWN_EVT; } else { - *rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + *rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT); } return true; } -/* tipc_link_bc_retrans() - retransmit zero or more packets - * @l: the link to transmit on - * @r: the receiving link ordering the retransmit. Same as l if unicast - * @from: retransmit from (inclusive) this sequence number - * @to: retransmit to (inclusive) this sequence number - * xmitq: queue for accumulating the retransmitted packets - */ -static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, - u16 from, u16 to, struct sk_buff_head *xmitq) -{ - struct sk_buff *_skb, *skb = skb_peek(&l->transmq); - u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; - u16 ack = l->rcv_nxt - 1; - int retransmitted = 0; - struct tipc_msg *hdr; - int rc = 0; - - if (!skb) - return 0; - if (less(to, from)) - return 0; - - trace_tipc_link_retrans(r, from, to, &l->transmq); - - if (link_retransmit_failure(l, r, &rc)) - return rc; - - skb_queue_walk(&l->transmq, skb) { - hdr = buf_msg(skb); - if (less(msg_seqno(hdr), from)) - continue; - if (more(msg_seqno(hdr), to)) - break; - if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) - continue; - TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; - _skb = pskb_copy(skb, GFP_ATOMIC); - if (!_skb) - return 0; - hdr = buf_msg(_skb); - msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_ack); - _skb->priority = TC_PRIO_CONTROL; - __skb_queue_tail(xmitq, _skb); - l->stats.retransmitted++; - retransmitted++; - /* Increase actual retrans counter & mark first time */ - if (!TIPC_SKB_CB(skb)->retr_cnt++) - TIPC_SKB_CB(skb)->retr_stamp = jiffies; - } - tipc_link_update_cwin(l, 0, retransmitted); - return 0; -} - /* tipc_data_input - deliver data and name distr msgs to upper layer * * Consumes buffer if message is of right type @@ -1402,46 +1354,71 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, return rc; } -static int tipc_link_release_pkts(struct tipc_link *l, u16 acked) -{ - int released = 0; - struct sk_buff *skb, *tmp; - - skb_queue_walk_safe(&l->transmq, skb, tmp) { - if (more(buf_seqno(skb), acked)) - break; - __skb_unlink(skb, &l->transmq); - kfree_skb(skb); - released++; +/** + * tipc_get_gap_ack_blks - get Gap ACK blocks from PROTOCOL/STATE_MSG + * @ga: returned pointer to the Gap ACK blocks if any + * @l: the tipc link + * @hdr: the PROTOCOL/STATE_MSG header + * @uc: desired Gap ACK blocks type, i.e. unicast (= 1) or broadcast (= 0) + * + * Return: the total Gap ACK blocks size + */ +u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l, + struct tipc_msg *hdr, bool uc) +{ + struct tipc_gap_ack_blks *p; + u16 sz = 0; + + /* Does peer support the Gap ACK blocks feature? */ + if (l->peer_caps & TIPC_GAP_ACK_BLOCK) { + p = (struct tipc_gap_ack_blks *)msg_data(hdr); + sz = ntohs(p->len); + /* Sanity check */ + if (sz == tipc_gap_ack_blks_sz(p->ugack_cnt + p->bgack_cnt)) { + /* Good, check if the desired type exists */ + if ((uc && p->ugack_cnt) || (!uc && p->bgack_cnt)) + goto ok; + /* Backward compatible: peer might not support bc, but uc? */ + } else if (uc && sz == tipc_gap_ack_blks_sz(p->ugack_cnt)) { + if (p->ugack_cnt) { + p->bgack_cnt = 0; + goto ok; + } + } } - return released; + /* Other cases: ignore! */ + p = NULL; + +ok: + *ga = p; + return sz; } -/* tipc_build_gap_ack_blks - build Gap ACK blocks - * @l: tipc link that data have come with gaps in sequence if any - * @data: data buffer to store the Gap ACK blocks after built - * - * returns the actual allocated memory size - */ -static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap) +static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga, + struct tipc_link *l, u8 start_index) { + struct tipc_gap_ack *gacks = &ga->gacks[start_index]; struct sk_buff *skb = skb_peek(&l->deferdq); - struct tipc_gap_ack_blks *ga = data; - u16 len, expect, seqno = 0; + u16 expect, seqno = 0; u8 n = 0; - if (!skb || !gap) - goto exit; + if (!skb) + return 0; expect = buf_seqno(skb); skb_queue_walk(&l->deferdq, skb) { seqno = buf_seqno(skb); if (unlikely(more(seqno, expect))) { - ga->gacks[n].ack = htons(expect - 1); - ga->gacks[n].gap = htons(seqno - expect); - if (++n >= MAX_GAP_ACK_BLKS) { - pr_info_ratelimited("Too few Gap ACK blocks!\n"); - goto exit; + gacks[n].ack = htons(expect - 1); + gacks[n].gap = htons(seqno - expect); + if (++n >= MAX_GAP_ACK_BLKS / 2) { + char buf[TIPC_MAX_LINK_NAME]; + + pr_info_ratelimited("Gacks on %s: %d, ql: %d!\n", + tipc_link_name_ext(l, buf), + n, + skb_queue_len(&l->deferdq)); + return n; } } else if (unlikely(less(seqno, expect))) { pr_warn("Unexpected skb in deferdq!\n"); @@ -1451,14 +1428,44 @@ static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap) } /* last block */ - ga->gacks[n].ack = htons(seqno); - ga->gacks[n].gap = 0; + gacks[n].ack = htons(seqno); + gacks[n].gap = 0; n++; + return n; +} -exit: - len = tipc_gap_ack_blks_sz(n); +/* tipc_build_gap_ack_blks - build Gap ACK blocks + * @l: tipc unicast link + * @hdr: the tipc message buffer to store the Gap ACK blocks after built + * + * The function builds Gap ACK blocks for both the unicast & broadcast receiver + * links of a certain peer, the buffer after built has the network data format + * as found at the struct tipc_gap_ack_blks definition. + * + * returns the actual allocated memory size + */ +static u16 tipc_build_gap_ack_blks(struct tipc_link *l, struct tipc_msg *hdr) +{ + struct tipc_link *bcl = l->bc_rcvlink; + struct tipc_gap_ack_blks *ga; + u16 len; + + ga = (struct tipc_gap_ack_blks *)msg_data(hdr); + + /* Start with broadcast link first */ + tipc_bcast_lock(bcl->net); + msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1); + msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); + ga->bgack_cnt = __tipc_build_gap_ack_blks(ga, bcl, 0); + tipc_bcast_unlock(bcl->net); + + /* Now for unicast link, but an explicit NACK only (???) */ + ga->ugack_cnt = (msg_seq_gap(hdr)) ? + __tipc_build_gap_ack_blks(ga, l, ga->bgack_cnt) : 0; + + /* Total len */ + len = tipc_gap_ack_blks_sz(ga->bgack_cnt + ga->ugack_cnt); ga->len = htons(len); - ga->gack_cnt = n; return len; } @@ -1466,47 +1473,109 @@ static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap) * acked packets, also doing retransmissions if * gaps found * @l: tipc link with transmq queue to be advanced + * @r: tipc link "receiver" i.e. in case of broadcast (= "l" if unicast) * @acked: seqno of last packet acked by peer without any gaps before * @gap: # of gap packets * @ga: buffer pointer to Gap ACK blocks from peer * @xmitq: queue for accumulating the retransmitted packets if any + * @retransmitted: returned boolean value if a retransmission is really issued + * @rc: returned code e.g. TIPC_LINK_DOWN_EVT if a repeated retransmit failures + * happens (- unlikely case) * - * In case of a repeated retransmit failures, the call will return shortly - * with a returned code (e.g. TIPC_LINK_DOWN_EVT) + * Return: the number of packets released from the link transmq */ -static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, +static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r, + u16 acked, u16 gap, struct tipc_gap_ack_blks *ga, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + bool *retransmitted, int *rc) { + struct tipc_gap_ack_blks *last_ga = r->last_ga, *this_ga = NULL; + struct tipc_gap_ack *gacks = NULL; struct sk_buff *skb, *_skb, *tmp; struct tipc_msg *hdr; + u32 qlen = skb_queue_len(&l->transmq); + u16 nacked = acked, ngap = gap, gack_cnt = 0; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; - bool retransmitted = false; u16 ack = l->rcv_nxt - 1; - bool passed = false; - u16 released = 0; u16 seqno, n = 0; - int rc = 0; + u16 end = r->acked, start = end, offset = r->last_gap; + u16 si = (last_ga) ? last_ga->start_index : 0; + bool is_uc = !link_is_bc_sndlink(l); + bool bc_has_acked = false; + + /* Determine Gap ACK blocks if any for the particular link */ + if (ga && is_uc) { + /* Get the Gap ACKs, uc part */ + gack_cnt = ga->ugack_cnt; + gacks = &ga->gacks[ga->bgack_cnt]; + } else if (ga) { + /* Copy the Gap ACKs, bc part, for later renewal if needed */ + this_ga = kmemdup(ga, tipc_gap_ack_blks_sz(ga->bgack_cnt), + GFP_ATOMIC); + if (likely(this_ga)) { + this_ga->start_index = 0; + /* Start with the bc Gap ACKs */ + gack_cnt = this_ga->bgack_cnt; + gacks = &this_ga->gacks[0]; + } else { + /* Hmm, we can get in trouble..., simply ignore it */ + pr_warn_ratelimited("Ignoring bc Gap ACKs, no memory\n"); + } + } + /* Advance the link transmq */ skb_queue_walk_safe(&l->transmq, skb, tmp) { seqno = buf_seqno(skb); next_gap_ack: - if (less_eq(seqno, acked)) { + if (less_eq(seqno, nacked)) { + if (is_uc) + goto release; + /* Skip packets peer has already acked */ + if (!more(seqno, r->acked)) + continue; + /* Get the next of last Gap ACK blocks */ + while (more(seqno, end)) { + if (!last_ga || si >= last_ga->bgack_cnt) + break; + start = end + offset + 1; + end = ntohs(last_ga->gacks[si].ack); + offset = ntohs(last_ga->gacks[si].gap); + si++; + WARN_ONCE(more(start, end) || + (!offset && + si < last_ga->bgack_cnt) || + si > MAX_GAP_ACK_BLKS, + "Corrupted Gap ACK: %d %d %d %d %d\n", + start, end, offset, si, + last_ga->bgack_cnt); + } + /* Check against the last Gap ACK block */ + if (in_range(seqno, start, end)) + continue; + /* Update/release the packet peer is acking */ + bc_has_acked = true; + if (--TIPC_SKB_CB(skb)->ackers) + continue; +release: /* release skb */ __skb_unlink(skb, &l->transmq); kfree_skb(skb); - released++; - } else if (less_eq(seqno, acked + gap)) { - /* First, check if repeated retrans failures occurs? */ - if (!passed && link_retransmit_failure(l, l, &rc)) - return rc; - passed = true; - + } else if (less_eq(seqno, nacked + ngap)) { + /* First gap: check if repeated retrans failures? */ + if (unlikely(seqno == acked + 1 && + link_retransmit_failure(l, r, rc))) { + /* Ignore this bc Gap ACKs if any */ + kfree(this_ga); + this_ga = NULL; + break; + } /* retransmit skb if unrestricted*/ if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) continue; - TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME; + TIPC_SKB_CB(skb)->nxt_retr = (is_uc) ? + TIPC_UC_RETR_TIME : TIPC_BC_RETR_LIM; _skb = pskb_copy(skb, GFP_ATOMIC); if (!_skb) continue; @@ -1516,25 +1585,51 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; - retransmitted = true; + *retransmitted = true; /* Increase actual retrans counter & mark first time */ if (!TIPC_SKB_CB(skb)->retr_cnt++) TIPC_SKB_CB(skb)->retr_stamp = jiffies; } else { /* retry with Gap ACK blocks if any */ - if (!ga || n >= ga->gack_cnt) + if (n >= gack_cnt) break; - acked = ntohs(ga->gacks[n].ack); - gap = ntohs(ga->gacks[n].gap); + nacked = ntohs(gacks[n].ack); + ngap = ntohs(gacks[n].gap); n++; goto next_gap_ack; } } - if (released || retransmitted) - tipc_link_update_cwin(l, released, retransmitted); - if (released) - tipc_link_advance_backlog(l, xmitq); - return 0; + + /* Renew last Gap ACK blocks for bc if needed */ + if (bc_has_acked) { + if (this_ga) { + kfree(last_ga); + r->last_ga = this_ga; + r->last_gap = gap; + } else if (last_ga) { + if (less(acked, start)) { + si--; + offset = start - acked - 1; + } else if (less(acked, end)) { + acked = end; + } + if (si < last_ga->bgack_cnt) { + last_ga->start_index = si; + r->last_gap = offset; + } else { + kfree(last_ga); + r->last_ga = NULL; + r->last_gap = 0; + } + } else { + r->last_gap = 0; + } + r->acked = acked; + } else { + kfree(this_ga); + } + + return qlen - skb_queue_len(&l->transmq); } /* tipc_link_build_state_msg: prepare link state message for transmission @@ -1651,7 +1746,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, kfree_skb(skb); break; } - released += tipc_link_release_pkts(l, msg_ack(hdr)); + released += tipc_link_advance_transmq(l, l, msg_ack(hdr), 0, + NULL, NULL, NULL, NULL); /* Defer delivery if sequence gap */ if (unlikely(seqno != rcv_nxt)) { @@ -1739,7 +1835,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, msg_set_probe(hdr, probe); msg_set_is_keepalive(hdr, probe || probe_reply); if (l->peer_caps & TIPC_GAP_ACK_BLOCK) - glen = tipc_build_gap_ack_blks(l, data, rcvgap); + glen = tipc_build_gap_ack_blks(l, hdr); tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id); msg_set_size(hdr, INT_H_SIZE + glen + dlen); skb_trim(skb, INT_H_SIZE + glen + dlen); @@ -2027,20 +2123,19 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, { struct tipc_msg *hdr = buf_msg(skb); struct tipc_gap_ack_blks *ga = NULL; - u16 rcvgap = 0; - u16 ack = msg_ack(hdr); - u16 gap = msg_seq_gap(hdr); + bool reply = msg_probe(hdr), retransmitted = false; + u16 dlen = msg_data_sz(hdr), glen = 0; u16 peers_snd_nxt = msg_next_sent(hdr); u16 peers_tol = msg_link_tolerance(hdr); u16 peers_prio = msg_linkprio(hdr); + u16 gap = msg_seq_gap(hdr); + u16 ack = msg_ack(hdr); u16 rcv_nxt = l->rcv_nxt; - u16 dlen = msg_data_sz(hdr); + u16 rcvgap = 0; int mtyp = msg_type(hdr); - bool reply = msg_probe(hdr); - u16 glen = 0; - void *data; + int rc = 0, released; char *if_name; - int rc = 0; + void *data; trace_tipc_proto_rcv(skb, false, l->name); if (tipc_link_is_blocked(l) || !xmitq) @@ -2137,13 +2232,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, } /* Receive Gap ACK blocks from peer if any */ - if (l->peer_caps & TIPC_GAP_ACK_BLOCK) { - ga = (struct tipc_gap_ack_blks *)data; - glen = ntohs(ga->len); - /* sanity check: if failed, ignore Gap ACK blocks */ - if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt)) - ga = NULL; - } + glen = tipc_get_gap_ack_blks(&ga, l, hdr, true); tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr, &l->mon_state, l->bearer_id); @@ -2158,9 +2247,14 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb, tipc_link_build_proto_msg(l, STATE_MSG, 0, reply, rcvgap, 0, 0, xmitq); - rc |= tipc_link_advance_transmq(l, ack, gap, ga, xmitq); + released = tipc_link_advance_transmq(l, l, ack, gap, ga, xmitq, + &retransmitted, &rc); if (gap) l->stats.recv_nacks++; + if (released || retransmitted) + tipc_link_update_cwin(l, released, retransmitted); + if (released) + tipc_link_advance_backlog(l, xmitq); if (unlikely(!skb_queue_empty(&l->wakeupq))) link_prepare_wakeup(l); } @@ -2246,10 +2340,7 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr) int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, struct sk_buff_head *xmitq) { - struct tipc_link *snd_l = l->bc_sndlink; u16 peers_snd_nxt = msg_bc_snd_nxt(hdr); - u16 from = msg_bcast_ack(hdr) + 1; - u16 to = from + msg_bc_gap(hdr) - 1; int rc = 0; if (!link_is_up(l)) @@ -2271,8 +2362,6 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, if (more(peers_snd_nxt, l->rcv_nxt + l->window)) return rc; - rc = tipc_link_bc_retrans(snd_l, l, from, to, xmitq); - l->snd_nxt = peers_snd_nxt; if (link_bc_rcv_gap(l)) rc |= TIPC_LINK_SND_STATE; @@ -2307,38 +2396,27 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, return 0; } -void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, - struct sk_buff_head *xmitq) +int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, + struct tipc_gap_ack_blks *ga, + struct sk_buff_head *xmitq) { - struct sk_buff *skb, *tmp; - struct tipc_link *snd_l = l->bc_sndlink; + struct tipc_link *l = r->bc_sndlink; + bool unused = false; + int rc = 0; - if (!link_is_up(l) || !l->bc_peer_is_up) - return; + if (!link_is_up(r) || !r->bc_peer_is_up) + return 0; - if (!more(acked, l->acked)) - return; + if (less(acked, r->acked) || (acked == r->acked && !gap && !ga)) + return 0; - trace_tipc_link_bc_ack(l, l->acked, acked, &snd_l->transmq); - /* Skip over packets peer has already acked */ - skb_queue_walk(&snd_l->transmq, skb) { - if (more(buf_seqno(skb), l->acked)) - break; - } + tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc); - /* Update/release the packets peer is acking now */ - skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) { - if (more(buf_seqno(skb), acked)) - break; - if (!--TIPC_SKB_CB(skb)->ackers) { - __skb_unlink(skb, &snd_l->transmq); - kfree_skb(skb); - } - } - l->acked = acked; - tipc_link_advance_backlog(snd_l, xmitq); - if (unlikely(!skb_queue_empty(&snd_l->wakeupq))) - link_prepare_wakeup(snd_l); + tipc_link_advance_backlog(l, xmitq); + if (unlikely(!skb_queue_empty(&l->wakeupq))) + link_prepare_wakeup(l); + + return rc; } /* tipc_link_bc_nack_rcv(): receive broadcast nack message @@ -2366,8 +2444,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, return 0; if (dnode == tipc_own_addr(l->net)) { - tipc_link_bc_ack_rcv(l, acked, xmitq); - rc = tipc_link_bc_retrans(l->bc_sndlink, l, from, to, xmitq); + rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq); l->stats.recv_nacks++; return rc; } diff --git a/net/tipc/link.h b/net/tipc/link.h index d3c1c3fc1659..0a0fa7350722 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -143,8 +143,11 @@ int tipc_link_bc_peers(struct tipc_link *l); void tipc_link_set_mtu(struct tipc_link *l, int mtu); int tipc_link_mtu(struct tipc_link *l); int tipc_link_mss(struct tipc_link *l); -void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, - struct sk_buff_head *xmitq); +u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l, + struct tipc_msg *hdr, bool uc); +int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap, + struct tipc_gap_ack_blks *ga, + struct sk_buff_head *xmitq); void tipc_link_build_bc_sync_msg(struct tipc_link *l, struct sk_buff_head *xmitq); void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 871feadbbc19..ca5f8689a33b 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -160,20 +160,39 @@ struct tipc_gap_ack { /* struct tipc_gap_ack_blks * @len: actual length of the record - * @gack_cnt: number of Gap ACK blocks in the record + * @ugack_cnt: number of Gap ACK blocks for unicast (following the broadcast + * ones) + * @start_index: starting index for "valid" broadcast Gap ACK blocks + * @bgack_cnt: number of Gap ACK blocks for broadcast in the record * @gacks: array of Gap ACK blocks + * + * 31 16 15 0 + * +-------------+-------------+-------------+-------------+ + * | bgack_cnt | ugack_cnt | len | + * +-------------+-------------+-------------+-------------+ - + * | gap | ack | | + * +-------------+-------------+-------------+-------------+ > bc gacks + * : : : | + * +-------------+-------------+-------------+-------------+ - + * | gap | ack | | + * +-------------+-------------+-------------+-------------+ > uc gacks + * : : : | + * +-------------+-------------+-------------+-------------+ - */ struct tipc_gap_ack_blks { __be16 len; - u8 gack_cnt; - u8 reserved; + union { + u8 ugack_cnt; + u8 start_index; + }; + u8 bgack_cnt; struct tipc_gap_ack gacks[]; }; #define tipc_gap_ack_blks_sz(n) (sizeof(struct tipc_gap_ack_blks) + \ sizeof(struct tipc_gap_ack) * (n)) -#define MAX_GAP_ACK_BLKS 32 +#define MAX_GAP_ACK_BLKS 128 #define MAX_GAP_ACK_BLKS_SZ tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS) static inline struct tipc_msg *buf_msg(struct sk_buff *skb) diff --git a/net/tipc/node.c b/net/tipc/node.c index 803a3a6d0f50..6a49b3eeaae9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -2071,10 +2071,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) le = &n->links[bearer_id]; /* Ensure broadcast reception is in synch with peer's send state */ - if (unlikely(usr == LINK_PROTOCOL)) + if (unlikely(usr == LINK_PROTOCOL)) { + if (unlikely(skb_linearize(skb))) { + tipc_node_put(n); + goto discard; + } + hdr = buf_msg(skb); tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq); - else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack)) + } else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack)) { tipc_bcast_ack_rcv(net, n->bc_entry.link, hdr); + } /* Receive packet directly if conditions permit */ tipc_node_read_lock(n); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:39:10
|
This commit enables dumping the statistics of a broadcast-receiver link like the traditional 'broadcast-link' one (which is for broadcast- sender). The link dumping can be triggered via netlink (e.g. the iproute2/tipc tool) by the link flag - 'TIPC_NLA_LINK_BROADCAST' as the indicator. The name of a broadcast-receiver link of a specific peer will be in the format: 'broadcast-link:<peer-id>'. For example: Link <broadcast-link:1001002> Window:50 packets RX packets:7841 fragments:2408/440 bundles:0/0 TX packets:0 fragments:0/0 bundles:0/0 RX naks:0 defs:124 dups:0 TX naks:21 acks:0 retrans:0 Congestion link:0 Send queue max:0 avg:0 In addition, the broadcast-receiver link statistics can be reset in the usual way via netlink by specifying that link name in command. Note: the 'tipc_link_name_ext()' is removed because the link name can now be retrieved simply via the 'l->name'. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/bcast.c | 6 ++--- net/tipc/bcast.h | 5 +++-- net/tipc/link.c | 65 +++++++++++++++++++++++++++--------------------------- net/tipc/link.h | 3 +-- net/tipc/msg.c | 9 ++++---- net/tipc/msg.h | 2 +- net/tipc/netlink.c | 2 +- net/tipc/node.c | 61 +++++++++++++++++++++++++++++++++++++++++++------- net/tipc/trace.h | 4 ++-- 9 files changed, 101 insertions(+), 56 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 50a16f8bebd9..383f87bc1061 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -563,10 +563,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l) tipc_sk_rcv(net, inputq); } -int tipc_bclink_reset_stats(struct net *net) +int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l) { - struct tipc_link *l = tipc_bc_sndlink(net); - if (!l) return -ENOPROTOOPT; @@ -694,7 +692,7 @@ int tipc_bcast_init(struct net *net) tn->bcbase = bb; spin_lock_init(&tipc_net(net)->bclock); - if (!tipc_link_bc_create(net, 0, 0, + if (!tipc_link_bc_create(net, 0, 0, NULL, FB_MTU, BCLINK_WIN_DEFAULT, BCLINK_WIN_DEFAULT, diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 97d3cf9d3e4d..4240c95188b1 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -96,9 +96,10 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr, struct sk_buff_head *retrq); -int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); +int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg, + struct tipc_link *bcl); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); -int tipc_bclink_reset_stats(struct net *net); +int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l); u32 tipc_bcast_get_broadcast_mode(struct net *net); u32 tipc_bcast_get_broadcast_ratio(struct net *net); diff --git a/net/tipc/link.c b/net/tipc/link.c index af352391e2ab..ee3b8d0576b8 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -539,7 +539,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, * * Returns true if link was created, otherwise false */ -bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, +bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, u8 *peer_id, int mtu, u32 min_win, u32 max_win, u16 peer_caps, struct sk_buff_head *inputq, struct sk_buff_head *namedq, @@ -554,7 +554,18 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, return false; l = *link; - strcpy(l->name, tipc_bclink_name); + if (peer_id) { + char peer_str[NODE_ID_STR_LEN] = {0,}; + + tipc_nodeid2string(peer_str, peer_id); + if (strlen(peer_str) > 16) + sprintf(peer_str, "%x", peer); + /* Broadcast receiver link name: "broadcast-link:<peer>" */ + snprintf(l->name, sizeof(l->name), "%s:%s", tipc_bclink_name, + peer_str); + } else { + strcpy(l->name, tipc_bclink_name); + } trace_tipc_link_reset(l, TIPC_DUMP_ALL, "bclink created!"); tipc_link_reset(l); l->state = LINK_RESET; @@ -1412,11 +1423,8 @@ static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga, gacks[n].ack = htons(expect - 1); gacks[n].gap = htons(seqno - expect); if (++n >= MAX_GAP_ACK_BLKS / 2) { - char buf[TIPC_MAX_LINK_NAME]; - pr_info_ratelimited("Gacks on %s: %d, ql: %d!\n", - tipc_link_name_ext(l, buf), - n, + l->name, n, skb_queue_len(&l->deferdq)); return n; } @@ -1587,6 +1595,8 @@ static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r, _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; + if (!is_uc) + r->stats.retransmitted++; *retransmitted = true; /* Increase actual retrans counter & mark first time */ if (!TIPC_SKB_CB(skb)->retr_cnt++) @@ -1753,7 +1763,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, /* Defer delivery if sequence gap */ if (unlikely(seqno != rcv_nxt)) { - __tipc_skb_queue_sorted(defq, seqno, skb); + if (!__tipc_skb_queue_sorted(defq, seqno, skb)) + l->stats.duplicates++; rc |= tipc_link_build_nack_msg(l, xmitq); break; } @@ -1787,15 +1798,15 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, int tolerance, int priority, struct sk_buff_head *xmitq) { + struct tipc_mon_state *mstate = &l->mon_state; + struct sk_buff_head *dfq = &l->deferdq; struct tipc_link *bcl = l->bc_rcvlink; - struct sk_buff *skb; struct tipc_msg *hdr; - struct sk_buff_head *dfq = &l->deferdq; + struct sk_buff *skb; bool node_up = link_is_up(bcl); - struct tipc_mon_state *mstate = &l->mon_state; + u16 glen = 0, bc_rcvgap = 0; int dlen = 0; void *data; - u16 glen = 0; /* Don't send protocol message during reset or link failover */ if (tipc_link_is_blocked(l)) @@ -1833,7 +1844,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, if (l->peer_caps & TIPC_LINK_PROTO_SEQNO) msg_set_seqno(hdr, l->snd_nxt_state++); msg_set_seq_gap(hdr, rcvgap); - msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl)); + bc_rcvgap = link_bc_rcv_gap(bcl); + msg_set_bc_gap(hdr, bc_rcvgap); msg_set_probe(hdr, probe); msg_set_is_keepalive(hdr, probe || probe_reply); if (l->peer_caps & TIPC_GAP_ACK_BLOCK) @@ -1858,6 +1870,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, l->stats.sent_probes++; if (rcvgap) l->stats.sent_nacks++; + if (bc_rcvgap) + bcl->stats.sent_nacks++; skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, skb); trace_tipc_proto_build(skb, false, l->name); @@ -2358,8 +2372,6 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, if (!l->bc_peer_is_up) return rc; - l->stats.recv_nacks++; - /* Ignore if peers_snd_nxt goes beyond receive window */ if (more(peers_snd_nxt, l->rcv_nxt + l->window)) return rc; @@ -2410,6 +2422,11 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, if (!link_is_up(r) || !r->bc_peer_is_up) return 0; + if (gap) { + l->stats.recv_nacks++; + r->stats.recv_nacks++; + } + if (less(acked, r->acked) || (acked == r->acked && !gap && !ga)) return 0; @@ -2721,16 +2738,15 @@ static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, return -EMSGSIZE; } -int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg) +int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg, + struct tipc_link *bcl) { int err; void *hdr; struct nlattr *attrs; struct nlattr *prop; - struct tipc_net *tn = net_generic(net, tipc_net_id); u32 bc_mode = tipc_bcast_get_broadcast_mode(net); u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net); - struct tipc_link *bcl = tn->bcl; if (!bcl) return 0; @@ -2817,21 +2833,6 @@ void tipc_link_set_abort_limit(struct tipc_link *l, u32 limit) l->abort_limit = limit; } -char *tipc_link_name_ext(struct tipc_link *l, char *buf) -{ - if (!l) - scnprintf(buf, TIPC_MAX_LINK_NAME, "null"); - else if (link_is_bc_sndlink(l)) - scnprintf(buf, TIPC_MAX_LINK_NAME, "broadcast-sender"); - else if (link_is_bc_rcvlink(l)) - scnprintf(buf, TIPC_MAX_LINK_NAME, - "broadcast-receiver, peer %x", l->addr); - else - memcpy(buf, l->name, TIPC_MAX_LINK_NAME); - - return buf; -} - /** * tipc_link_dump - dump TIPC link data * @l: tipc link to be dumped diff --git a/net/tipc/link.h b/net/tipc/link.h index 4d0768cf91d5..fc07232c9a12 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -80,7 +80,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, struct sk_buff_head *inputq, struct sk_buff_head *namedq, struct tipc_link **link); -bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, +bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer, u8 *peer_id, int mtu, u32 min_win, u32 max_win, u16 peer_caps, struct sk_buff_head *inputq, struct sk_buff_head *namedq, @@ -111,7 +111,6 @@ u16 tipc_link_rcv_nxt(struct tipc_link *l); u16 tipc_link_acked(struct tipc_link *l); u32 tipc_link_id(struct tipc_link *l); char *tipc_link_name(struct tipc_link *l); -char *tipc_link_name_ext(struct tipc_link *l, char *buf); u32 tipc_link_state(struct tipc_link *l); char tipc_link_plane(struct tipc_link *l); int tipc_link_prio(struct tipc_link *l); diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 4d0e0bdd997b..c69fb99163fc 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -825,19 +825,19 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, * @seqno: sequence number of buffer to add * @skb: buffer to add */ -void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, +bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb) { struct sk_buff *_skb, *tmp; if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) { __skb_queue_head(list, skb); - return; + return true; } if (more(seqno, buf_seqno(skb_peek_tail(list)))) { __skb_queue_tail(list, skb); - return; + return true; } skb_queue_walk_safe(list, _skb, tmp) { @@ -846,9 +846,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, if (seqno == buf_seqno(_skb)) break; __skb_queue_before(list, _skb, skb); - return; + return true; } kfree_skb(skb); + return false; } void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, diff --git a/net/tipc/msg.h b/net/tipc/msg.h index ca5f8689a33b..cd4281779468 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1145,7 +1145,7 @@ bool tipc_msg_assemble(struct sk_buff_head *list); bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, struct sk_buff_head *cpy); -void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, +bool __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb); bool tipc_msg_skb_clone(struct sk_buff_head *msg, struct sk_buff_head *cpy); diff --git a/net/tipc/netlink.c b/net/tipc/netlink.c index bb9862410e68..c4aee6247d55 100644 --- a/net/tipc/netlink.c +++ b/net/tipc/netlink.c @@ -188,7 +188,7 @@ static const struct genl_ops tipc_genl_v2_ops[] = { }, { .cmd = TIPC_NL_LINK_GET, - .validate = GENL_DONT_VALIDATE_STRICT | GENL_DONT_VALIDATE_DUMP, + .validate = GENL_DONT_VALIDATE_STRICT, .doit = tipc_nl_node_get_link, .dumpit = tipc_nl_node_dump_link, }, diff --git a/net/tipc/node.c b/net/tipc/node.c index 548207fdec15..0312fb181d94 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1138,7 +1138,7 @@ void tipc_node_check_dest(struct net *net, u32 addr, if (unlikely(!n->bc_entry.link)) { snd_l = tipc_bc_sndlink(net); if (!tipc_link_bc_create(net, tipc_own_addr(net), - addr, U16_MAX, + addr, peer_id, U16_MAX, tipc_link_min_win(snd_l), tipc_link_max_win(snd_l), n->capabilities, @@ -2435,7 +2435,7 @@ int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info) return -ENOMEM; if (strcmp(name, tipc_bclink_name) == 0) { - err = tipc_nl_add_bc_link(net, &msg); + err = tipc_nl_add_bc_link(net, &msg, tipc_net(net)->bcl); if (err) goto err_free; } else { @@ -2479,6 +2479,7 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info) struct tipc_node *node; struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1]; struct net *net = sock_net(skb->sk); + struct tipc_net *tn = tipc_net(net); struct tipc_link_entry *le; if (!info->attrs[TIPC_NLA_LINK]) @@ -2495,11 +2496,26 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info) link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]); - if (strcmp(link_name, tipc_bclink_name) == 0) { - err = tipc_bclink_reset_stats(net); + err = -EINVAL; + if (!strcmp(link_name, tipc_bclink_name)) { + err = tipc_bclink_reset_stats(net, tipc_bc_sndlink(net)); if (err) return err; return 0; + } else if (strstr(link_name, tipc_bclink_name)) { + rcu_read_lock(); + list_for_each_entry_rcu(node, &tn->node_list, list) { + tipc_node_read_lock(node); + link = node->bc_entry.link; + if (link && !strcmp(link_name, tipc_link_name(link))) { + err = tipc_bclink_reset_stats(net, link); + tipc_node_read_unlock(node); + break; + } + tipc_node_read_unlock(node); + } + rcu_read_unlock(); + return err; } node = tipc_node_find_by_name(net, link_name, &bearer_id); @@ -2523,7 +2539,8 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info) /* Caller should hold node lock */ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg, - struct tipc_node *node, u32 *prev_link) + struct tipc_node *node, u32 *prev_link, + bool bc_link) { u32 i; int err; @@ -2539,6 +2556,14 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg, if (err) return err; } + + if (bc_link) { + *prev_link = i; + err = tipc_nl_add_bc_link(net, msg, node->bc_entry.link); + if (err) + return err; + } + *prev_link = 0; return 0; @@ -2547,17 +2572,36 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg, int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb) { struct net *net = sock_net(skb->sk); + struct nlattr **attrs = genl_dumpit_info(cb)->attrs; + struct nlattr *link[TIPC_NLA_LINK_MAX + 1]; struct tipc_net *tn = net_generic(net, tipc_net_id); struct tipc_node *node; struct tipc_nl_msg msg; u32 prev_node = cb->args[0]; u32 prev_link = cb->args[1]; int done = cb->args[2]; + bool bc_link = cb->args[3]; int err; if (done) return 0; + if (!prev_node) { + /* Check if broadcast-receiver links dumping is needed */ + if (attrs && attrs[TIPC_NLA_LINK]) { + err = nla_parse_nested_deprecated(link, + TIPC_NLA_LINK_MAX, + attrs[TIPC_NLA_LINK], + tipc_nl_link_policy, + NULL); + if (unlikely(err)) + return err; + if (unlikely(!link[TIPC_NLA_LINK_BROADCAST])) + return -EINVAL; + bc_link = true; + } + } + msg.skb = skb; msg.portid = NETLINK_CB(cb->skb).portid; msg.seq = cb->nlh->nlmsg_seq; @@ -2581,7 +2625,7 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb) list) { tipc_node_read_lock(node); err = __tipc_nl_add_node_links(net, &msg, node, - &prev_link); + &prev_link, bc_link); tipc_node_read_unlock(node); if (err) goto out; @@ -2589,14 +2633,14 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb) prev_node = node->addr; } } else { - err = tipc_nl_add_bc_link(net, &msg); + err = tipc_nl_add_bc_link(net, &msg, tn->bcl); if (err) goto out; list_for_each_entry_rcu(node, &tn->node_list, list) { tipc_node_read_lock(node); err = __tipc_nl_add_node_links(net, &msg, node, - &prev_link); + &prev_link, bc_link); tipc_node_read_unlock(node); if (err) goto out; @@ -2611,6 +2655,7 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb) cb->args[0] = prev_node; cb->args[1] = prev_link; cb->args[2] = done; + cb->args[3] = bc_link; return skb->len; } diff --git a/net/tipc/trace.h b/net/tipc/trace.h index e7535ab75255..04af83f0500c 100644 --- a/net/tipc/trace.h +++ b/net/tipc/trace.h @@ -255,7 +255,7 @@ DECLARE_EVENT_CLASS(tipc_link_class, TP_fast_assign( __assign_str(header, header); - tipc_link_name_ext(l, __entry->name); + memcpy(__entry->name, tipc_link_name(l), TIPC_MAX_LINK_NAME); tipc_link_dump(l, dqueues, __get_str(buf)); ), @@ -295,7 +295,7 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class, ), TP_fast_assign( - tipc_link_name_ext(r, __entry->name); + memcpy(__entry->name, tipc_link_name(r), TIPC_MAX_LINK_NAME); __entry->from = f; __entry->to = t; __entry->len = skb_queue_len(tq); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:39:06
|
In some environment, broadcast traffic is suppressed at high rate (i.e. a kind of bandwidth limit setting). When it is applied, TIPC broadcast can still run successfully. However, when it comes to a high load, some packets will be dropped first and TIPC tries to retransmit them but the packet retransmission is intentionally broadcast too, so making things worse and not helpful at all. This commit enables the broadcast retransmission via unicast which only retransmits packets to the specific peer that has really reported a gap i.e. not broadcasting to all nodes in the cluster, so will prevent from being suppressed, and also reduce some overheads on the other peers due to duplicates, finally improve the overall TIPC broadcast performance. Note: the functionality can be turned on/off via the sysctl file: echo 1 > /proc/sys/net/tipc/bc_retruni echo 0 > /proc/sys/net/tipc/bc_retruni Default is '0', i.e. the broadcast retransmission still works as usual. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/bcast.c | 11 ++++++++--- net/tipc/bcast.h | 4 +++- net/tipc/link.c | 10 ++++++---- net/tipc/link.h | 3 ++- net/tipc/node.c | 2 +- net/tipc/sysctl.c | 9 ++++++++- 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 3ce690a96ee9..50a16f8bebd9 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -46,6 +46,7 @@ #define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ const char tipc_bclink_name[] = "broadcast-link"; +unsigned long sysctl_tipc_bc_retruni __read_mostly; /** * struct tipc_bc_base - base structure for keeping broadcast send state @@ -474,7 +475,7 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, __skb_queue_head_init(&xmitq); tipc_bcast_lock(net); - tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq); + tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq, NULL); tipc_bcast_unlock(net); tipc_bcbase_xmit(net, &xmitq); @@ -489,7 +490,8 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, * RCU is locked, no other locks set */ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, - struct tipc_msg *hdr) + struct tipc_msg *hdr, + struct sk_buff_head *retrq) { struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq; struct tipc_gap_ack_blks *ga; @@ -503,8 +505,11 @@ int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, tipc_link_bc_init_rcv(l, hdr); } else if (!msg_bc_ack_invalid(hdr)) { tipc_get_gap_ack_blks(&ga, l, hdr, false); + if (!sysctl_tipc_bc_retruni) + retrq = &xmitq; rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), - msg_bc_gap(hdr), ga, &xmitq); + msg_bc_gap(hdr), ga, &xmitq, + retrq); rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq); } tipc_bcast_unlock(net); diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 9e847d9617d3..97d3cf9d3e4d 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -45,6 +45,7 @@ struct tipc_nl_msg; struct tipc_nlist; struct tipc_nitem; extern const char tipc_bclink_name[]; +extern unsigned long sysctl_tipc_bc_retruni; #define TIPC_METHOD_EXPIRE msecs_to_jiffies(5000) @@ -93,7 +94,8 @@ int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, struct tipc_msg *hdr); int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, - struct tipc_msg *hdr); + struct tipc_msg *hdr, + struct sk_buff_head *retrq); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_bclink_reset_stats(struct net *net); diff --git a/net/tipc/link.c b/net/tipc/link.c index 288c5670cfa5..af352391e2ab 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -375,7 +375,7 @@ void tipc_link_remove_bc_peer(struct tipc_link *snd_l, snd_l->ackers--; rcv_l->bc_peer_is_up = true; rcv_l->state = LINK_ESTABLISHED; - tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq); + tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq, NULL); trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!"); tipc_link_reset(rcv_l); rcv_l->state = LINK_RESET; @@ -2400,7 +2400,8 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, struct tipc_gap_ack_blks *ga, - struct sk_buff_head *xmitq) + struct sk_buff_head *xmitq, + struct sk_buff_head *retrq) { struct tipc_link *l = r->bc_sndlink; bool unused = false; @@ -2413,7 +2414,7 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, return 0; trace_tipc_link_bc_ack(r, acked, gap, &l->transmq); - tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc); + tipc_link_advance_transmq(l, r, acked, gap, ga, retrq, &unused, &rc); tipc_link_advance_backlog(l, xmitq); if (unlikely(!skb_queue_empty(&l->wakeupq))) @@ -2447,7 +2448,8 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb, return 0; if (dnode == tipc_own_addr(l->net)) { - rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq); + rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL, xmitq, + xmitq); l->stats.recv_nacks++; return rc; } diff --git a/net/tipc/link.h b/net/tipc/link.h index 0a0fa7350722..4d0768cf91d5 100644 --- a/net/tipc/link.h +++ b/net/tipc/link.h @@ -147,7 +147,8 @@ u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct tipc_link *l, struct tipc_msg *hdr, bool uc); int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap, struct tipc_gap_ack_blks *ga, - struct sk_buff_head *xmitq); + struct sk_buff_head *xmitq, + struct sk_buff_head *retrq); void tipc_link_build_bc_sync_msg(struct tipc_link *l, struct sk_buff_head *xmitq); void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr); diff --git a/net/tipc/node.c b/net/tipc/node.c index 6a49b3eeaae9..548207fdec15 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1772,7 +1772,7 @@ static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, struct tipc_link *ucl; int rc; - rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr); + rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr, xmitq); if (rc & TIPC_LINK_DOWN_EVT) { tipc_node_reset_links(n); diff --git a/net/tipc/sysctl.c b/net/tipc/sysctl.c index 58ab3d6dcdce..97a6264a2993 100644 --- a/net/tipc/sysctl.c +++ b/net/tipc/sysctl.c @@ -36,7 +36,7 @@ #include "core.h" #include "trace.h" #include "crypto.h" - +#include "bcast.h" #include <linux/sysctl.h> static struct ctl_table_header *tipc_ctl_hdr; @@ -75,6 +75,13 @@ static struct ctl_table tipc_table[] = { .extra1 = SYSCTL_ONE, }, #endif + { + .procname = "bc_retruni", + .data = &sysctl_tipc_bc_retruni, + .maxlen = sizeof(sysctl_tipc_bc_retruni), + .mode = 0644, + .proc_handler = proc_doulongvec_minmax, + }, {} }; -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:39:04
|
In the previous commit ("tipc: add Gap ACK blocks support for broadcast link"), we have removed the following link trace events due to the code changes: - tipc_link_bc_ack - tipc_link_retrans This commit adds them back along with some minor changes to adapt to the new code. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 3 +++ net/tipc/trace.h | 13 ++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index d29b9c531171..288c5670cfa5 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1504,6 +1504,8 @@ static int tipc_link_advance_transmq(struct tipc_link *l, struct tipc_link *r, bool is_uc = !link_is_bc_sndlink(l); bool bc_has_acked = false; + trace_tipc_link_retrans(r, acked + 1, acked + gap, &l->transmq); + /* Determine Gap ACK blocks if any for the particular link */ if (ga && is_uc) { /* Get the Gap ACKs, uc part */ @@ -2410,6 +2412,7 @@ int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap, if (less(acked, r->acked) || (acked == r->acked && !gap && !ga)) return 0; + trace_tipc_link_bc_ack(r, acked, gap, &l->transmq); tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused, &rc); tipc_link_advance_backlog(l, xmitq); diff --git a/net/tipc/trace.h b/net/tipc/trace.h index 4d8e00483afc..e7535ab75255 100644 --- a/net/tipc/trace.h +++ b/net/tipc/trace.h @@ -299,8 +299,10 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class, __entry->from = f; __entry->to = t; __entry->len = skb_queue_len(tq); - __entry->fseqno = msg_seqno(buf_msg(skb_peek(tq))); - __entry->lseqno = msg_seqno(buf_msg(skb_peek_tail(tq))); + __entry->fseqno = __entry->len ? + msg_seqno(buf_msg(skb_peek(tq))) : 0; + __entry->lseqno = __entry->len ? + msg_seqno(buf_msg(skb_peek_tail(tq))) : 0; ), TP_printk("<%s> retrans req: [%u-%u] transmq: %u [%u-%u]\n", @@ -308,15 +310,16 @@ DECLARE_EVENT_CLASS(tipc_link_transmq_class, __entry->len, __entry->fseqno, __entry->lseqno) ); -DEFINE_EVENT(tipc_link_transmq_class, tipc_link_retrans, +DEFINE_EVENT_CONDITION(tipc_link_transmq_class, tipc_link_retrans, TP_PROTO(struct tipc_link *r, u16 f, u16 t, struct sk_buff_head *tq), - TP_ARGS(r, f, t, tq) + TP_ARGS(r, f, t, tq), + TP_CONDITION(less_eq(f, t)) ); DEFINE_EVENT_PRINT(tipc_link_transmq_class, tipc_link_bc_ack, TP_PROTO(struct tipc_link *r, u16 f, u16 t, struct sk_buff_head *tq), TP_ARGS(r, f, t, tq), - TP_printk("<%s> acked: [%u-%u] transmq: %u [%u-%u]\n", + TP_printk("<%s> acked: %u gap: %u transmq: %u [%u-%u]\n", __entry->name, __entry->from, __entry->to, __entry->len, __entry->fseqno, __entry->lseqno) ); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-26 09:39:02
|
This series adds some improvements to TIPC. The first patch improves the TIPC broadcast's performance with the 'Gap ACK blocks' mechanism similar to unicast before, while the others give support on tracing & statistics for broadcast links, and an alternative to carry broadcast retransmissions via unicast which might be useful in some cases. Besides, the Nagle algorithm can now automatically 'adjust' itself depending on the specific network condition a stream connection runs by the last patch. Tuong Lien (5): tipc: introduce Gap ACK blocks for broadcast link tipc: add back link trace events tipc: enable broadcast retrans via unicast tipc: add support for broadcast rcv stats dumping tipc: add test for Nagle algorithm effectiveness net/tipc/bcast.c | 22 ++- net/tipc/bcast.h | 9 +- net/tipc/link.c | 487 +++++++++++++++++++++++++++++++---------------------- net/tipc/link.h | 11 +- net/tipc/msg.c | 12 +- net/tipc/msg.h | 43 ++++- net/tipc/netlink.c | 2 +- net/tipc/node.c | 73 ++++++-- net/tipc/socket.c | 64 +++++-- net/tipc/sysctl.c | 9 +- net/tipc/trace.h | 17 +- 11 files changed, 486 insertions(+), 263 deletions(-) -- 2.13.7 |
From: Ying X. <yin...@wi...> - 2020-05-24 12:32:01
|
On 5/21/20 11:03 PM, Jon Maloy wrote: > https://github.com/willemt/raft.git, that was only 3,500 lines of code, > although with uncertain quality status of course. > I suspect that a large part of those is code that are dealing with > neighbor monitoring, failure detection etc, i.e, services that the > topology server already has for free. It would be very interesting to > see if it would be possible to reduce this code amount to an acceptable > volume for inclusion in the TIPC module. Good finding Jon! Let me try to study this one. |
From: Jon M. <jm...@re...> - 2020-05-22 21:37:43
|
On 5/22/20 4:10 PM, Eric Dumazet wrote: > > On 5/22/20 12:47 PM, Jon Maloy wrote: >> >> On 5/22/20 11:57 AM, Eric Dumazet wrote: >>> On 5/22/20 8:01 AM, Jon Maloy wrote: >>>> On 5/22/20 2:18 AM, Xin Long wrote: >>>>> On Fri, May 22, 2020 at 1:55 PM Eric Dumazet <edu...@go...> wrote: >>>>>> Resend to the list in non HTML form >>>>>> >>>>>> >>>>>> On Thu, May 21, 2020 at 10:53 PM Eric Dumazet <edu...@go...> wrote: >>>>>>> On Thu, May 21, 2020 at 10:50 PM Xin Long <luc...@gm...> wrote: >>>>>>>> On Fri, May 22, 2020 at 2:30 AM Eric Dumazet <edu...@go...> wrote: >>>>>>>>> dst_cache_get() documents it must be used with BH disabled. >>>>>>>> Interesting, I thought under rcu_read_lock() is enough, which calls >>>>>>>> preempt_disable(). >>>>>>> rcu_read_lock() does not disable BH, never. >>>>>>> >>>>>>> And rcu_read_lock() does not necessarily disable preemption. >>>>> Then I need to think again if it's really worth using dst_cache here. >>>>> >>>>> Also add tipc-discussion and Jon to CC list. >>>> The suggested solution will affect all bearers, not only UDP, so it is not a good. >>>> Is there anything preventing us from disabling preemtion inside the scope of the rcu lock? >>>> >>>> ///jon >>>> >>> BH is disabled any way few nano seconds later, disabling it a bit earlier wont make any difference. >> The point is that if we only disable inside tipc_udp_xmit() (the function pointer call) the change will only affect the UDP bearer, where dst_cache is used. >> The corresponding calls for the Ethernet and Infiniband bearers don't use dst_cache, and don't need this disabling. So it does makes a difference. >> > I honestly do not understand your concern, this makes no sense to me. > > I have disabled BH _right_ before the dst_cache_get(cache) call, so has no effect if the dst_cache is not used, this should be obvious. Forget my comment. I thought we were discussing to Tetsuo Handa's original patch, and missed that you had posted your own. I have no problems with this one. ///jon > > If some other paths do not use dst)cache, how can my patch have any effect on them ? > > What alternative are you suggesting ? > |
From: Jon M. <jm...@re...> - 2020-05-22 19:47:29
|
On 5/22/20 11:57 AM, Eric Dumazet wrote: > > On 5/22/20 8:01 AM, Jon Maloy wrote: >> >> On 5/22/20 2:18 AM, Xin Long wrote: >>> On Fri, May 22, 2020 at 1:55 PM Eric Dumazet <edu...@go...> wrote: >>>> Resend to the list in non HTML form >>>> >>>> >>>> On Thu, May 21, 2020 at 10:53 PM Eric Dumazet <edu...@go...> wrote: >>>>> >>>>> On Thu, May 21, 2020 at 10:50 PM Xin Long <luc...@gm...> wrote: >>>>>> On Fri, May 22, 2020 at 2:30 AM Eric Dumazet <edu...@go...> wrote: >>>>>>> dst_cache_get() documents it must be used with BH disabled. >>>>>> Interesting, I thought under rcu_read_lock() is enough, which calls >>>>>> preempt_disable(). >>>>> rcu_read_lock() does not disable BH, never. >>>>> >>>>> And rcu_read_lock() does not necessarily disable preemption. >>> Then I need to think again if it's really worth using dst_cache here. >>> >>> Also add tipc-discussion and Jon to CC list. >> The suggested solution will affect all bearers, not only UDP, so it is not a good. >> Is there anything preventing us from disabling preemtion inside the scope of the rcu lock? >> >> ///jon >> > BH is disabled any way few nano seconds later, disabling it a bit earlier wont make any difference. The point is that if we only disable inside tipc_udp_xmit() (the function pointer call) the change will only affect the UDP bearer, where dst_cache is used. The corresponding calls for the Ethernet and Infiniband bearers don't use dst_cache, and don't need this disabling. So it does makes a difference. ///jon > > Also, if you intend to make dst_cache BH reentrant, you will have to make that for net-next, not net tree. > > Please carefully read include/net/dst_cache.h > > It is very clear about BH requirements. > > |
From: Jon M. <jm...@re...> - 2020-05-22 15:01:39
|
On 5/22/20 2:18 AM, Xin Long wrote: > On Fri, May 22, 2020 at 1:55 PM Eric Dumazet <edu...@go...> wrote: >> Resend to the list in non HTML form >> >> >> On Thu, May 21, 2020 at 10:53 PM Eric Dumazet <edu...@go...> wrote: >>> >>> >>> On Thu, May 21, 2020 at 10:50 PM Xin Long <luc...@gm...> wrote: >>>> On Fri, May 22, 2020 at 2:30 AM Eric Dumazet <edu...@go...> wrote: >>>>> dst_cache_get() documents it must be used with BH disabled. >>>> Interesting, I thought under rcu_read_lock() is enough, which calls >>>> preempt_disable(). >>> >>> rcu_read_lock() does not disable BH, never. >>> >>> And rcu_read_lock() does not necessarily disable preemption. > Then I need to think again if it's really worth using dst_cache here. > > Also add tipc-discussion and Jon to CC list. The suggested solution will affect all bearers, not only UDP, so it is not a good. Is there anything preventing us from disabling preemtion inside the scope of the rcu lock? ///jon > > Thanks. > >>> >>>> have you checked other places where dst_cache_get() are used? >>> >>> >>> Yes, other paths are fine. >>> >>>> >>>>> sysbot reported : >>>>> >>>>> BUG: using smp_processor_id() in preemptible [00000000] code: /21697 >>>>> caller is dst_cache_get+0x3a/0xb0 net/core/dst_cache.c:68 >>>>> CPU: 0 PID: 21697 Comm: Not tainted 5.7.0-rc6-syzkaller #0 >>>>> Hardware name: Google Google Compute Engine/Google Compute Engine, BIOS Google 01/01/2011 >>>>> Call Trace: >>>>> __dump_stack lib/dump_stack.c:77 [inline] >>>>> dump_stack+0x188/0x20d lib/dump_stack.c:118 >>>>> check_preemption_disabled lib/smp_processor_id.c:47 [inline] >>>>> debug_smp_processor_id.cold+0x88/0x9b lib/smp_processor_id.c:57 >>>>> dst_cache_get+0x3a/0xb0 net/core/dst_cache.c:68 >>>>> tipc_udp_xmit.isra.0+0xb9/0xad0 net/tipc/udp_media.c:164 >>>>> tipc_udp_send_msg+0x3e6/0x490 net/tipc/udp_media.c:244 >>>>> tipc_bearer_xmit_skb+0x1de/0x3f0 net/tipc/bearer.c:526 >>>>> tipc_enable_bearer+0xb2f/0xd60 net/tipc/bearer.c:331 >>>>> __tipc_nl_bearer_enable+0x2bf/0x390 net/tipc/bearer.c:995 >>>>> tipc_nl_bearer_enable+0x1e/0x30 net/tipc/bearer.c:1003 >>>>> genl_family_rcv_msg_doit net/netlink/genetlink.c:673 [inline] >>>>> genl_family_rcv_msg net/netlink/genetlink.c:718 [inline] >>>>> genl_rcv_msg+0x627/0xdf0 net/netlink/genetlink.c:735 >>>>> netlink_rcv_skb+0x15a/0x410 net/netlink/af_netlink.c:2469 >>>>> genl_rcv+0x24/0x40 net/netlink/genetlink.c:746 >>>>> netlink_unicast_kernel net/netlink/af_netlink.c:1303 [inline] >>>>> netlink_unicast+0x537/0x740 net/netlink/af_netlink.c:1329 >>>>> netlink_sendmsg+0x882/0xe10 net/netlink/af_netlink.c:1918 >>>>> sock_sendmsg_nosec net/socket.c:652 [inline] >>>>> sock_sendmsg+0xcf/0x120 net/socket.c:672 >>>>> ____sys_sendmsg+0x6bf/0x7e0 net/socket.c:2362 >>>>> ___sys_sendmsg+0x100/0x170 net/socket.c:2416 >>>>> __sys_sendmsg+0xec/0x1b0 net/socket.c:2449 >>>>> do_syscall_64+0xf6/0x7d0 arch/x86/entry/common.c:295 >>>>> entry_SYSCALL_64_after_hwframe+0x49/0xb3 >>>>> RIP: 0033:0x45ca29 >>>>> >>>>> Fixes: e9c1a793210f ("tipc: add dst_cache support for udp media") >>>>> Cc: Xin Long <luc...@gm...> >>>>> Cc: Jon Maloy <jon...@er...> >>>>> Signed-off-by: Eric Dumazet <edu...@go...> >>>>> Reported-by: syzbot <syz...@go...> >>>>> --- >>>>> net/tipc/udp_media.c | 6 +++++- >>>>> 1 file changed, 5 insertions(+), 1 deletion(-) >>>>> >>>>> diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c >>>>> index d6620ad535461a4d04ed5ba90569ce8b7df9f994..28a283f26a8dff24d613e6ed57e5e69d894dae66 100644 >>>>> --- a/net/tipc/udp_media.c >>>>> +++ b/net/tipc/udp_media.c >>>>> @@ -161,9 +161,11 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff *skb, >>>>> struct udp_bearer *ub, struct udp_media_addr *src, >>>>> struct udp_media_addr *dst, struct dst_cache *cache) >>>>> { >>>>> - struct dst_entry *ndst = dst_cache_get(cache); >>>>> + struct dst_entry *ndst; >>>>> int ttl, err = 0; >>>>> >>>>> + local_bh_disable(); >>>>> + ndst = dst_cache_get(cache); >>>>> if (dst->proto == htons(ETH_P_IP)) { >>>>> struct rtable *rt = (struct rtable *)ndst; >>>>> >>>>> @@ -210,9 +212,11 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff *skb, >>>>> src->port, dst->port, false); >>>>> #endif >>>>> } >>>>> + local_bh_enable(); >>>>> return err; >>>>> >>>>> tx_error: >>>>> + local_bh_enable(); >>>>> kfree_skb(skb); >>>>> return err; >>>>> } >>>>> -- >>>>> 2.27.0.rc0.183.gde8f92d652-goog >>>>> |
From: Xin L. <luc...@gm...> - 2020-05-22 06:11:59
|
On Fri, May 22, 2020 at 1:55 PM Eric Dumazet <edu...@go...> wrote: > > Resend to the list in non HTML form > > > On Thu, May 21, 2020 at 10:53 PM Eric Dumazet <edu...@go...> wrote: > > > > > > > > On Thu, May 21, 2020 at 10:50 PM Xin Long <luc...@gm...> wrote: > >> > >> On Fri, May 22, 2020 at 2:30 AM Eric Dumazet <edu...@go...> wrote: > >> > > >> > dst_cache_get() documents it must be used with BH disabled. > >> Interesting, I thought under rcu_read_lock() is enough, which calls > >> preempt_disable(). > > > > > > rcu_read_lock() does not disable BH, never. > > > > And rcu_read_lock() does not necessarily disable preemption. Then I need to think again if it's really worth using dst_cache here. Also add tipc-discussion and Jon to CC list. Thanks. > > > > > >> > >> have you checked other places where dst_cache_get() are used? > > > > > > > > Yes, other paths are fine. > > > >> > >> > >> > > >> > sysbot reported : > >> > > >> > BUG: using smp_processor_id() in preemptible [00000000] code: /21697 > >> > caller is dst_cache_get+0x3a/0xb0 net/core/dst_cache.c:68 > >> > CPU: 0 PID: 21697 Comm: Not tainted 5.7.0-rc6-syzkaller #0 > >> > Hardware name: Google Google Compute Engine/Google Compute Engine, BIOS Google 01/01/2011 > >> > Call Trace: > >> > __dump_stack lib/dump_stack.c:77 [inline] > >> > dump_stack+0x188/0x20d lib/dump_stack.c:118 > >> > check_preemption_disabled lib/smp_processor_id.c:47 [inline] > >> > debug_smp_processor_id.cold+0x88/0x9b lib/smp_processor_id.c:57 > >> > dst_cache_get+0x3a/0xb0 net/core/dst_cache.c:68 > >> > tipc_udp_xmit.isra.0+0xb9/0xad0 net/tipc/udp_media.c:164 > >> > tipc_udp_send_msg+0x3e6/0x490 net/tipc/udp_media.c:244 > >> > tipc_bearer_xmit_skb+0x1de/0x3f0 net/tipc/bearer.c:526 > >> > tipc_enable_bearer+0xb2f/0xd60 net/tipc/bearer.c:331 > >> > __tipc_nl_bearer_enable+0x2bf/0x390 net/tipc/bearer.c:995 > >> > tipc_nl_bearer_enable+0x1e/0x30 net/tipc/bearer.c:1003 > >> > genl_family_rcv_msg_doit net/netlink/genetlink.c:673 [inline] > >> > genl_family_rcv_msg net/netlink/genetlink.c:718 [inline] > >> > genl_rcv_msg+0x627/0xdf0 net/netlink/genetlink.c:735 > >> > netlink_rcv_skb+0x15a/0x410 net/netlink/af_netlink.c:2469 > >> > genl_rcv+0x24/0x40 net/netlink/genetlink.c:746 > >> > netlink_unicast_kernel net/netlink/af_netlink.c:1303 [inline] > >> > netlink_unicast+0x537/0x740 net/netlink/af_netlink.c:1329 > >> > netlink_sendmsg+0x882/0xe10 net/netlink/af_netlink.c:1918 > >> > sock_sendmsg_nosec net/socket.c:652 [inline] > >> > sock_sendmsg+0xcf/0x120 net/socket.c:672 > >> > ____sys_sendmsg+0x6bf/0x7e0 net/socket.c:2362 > >> > ___sys_sendmsg+0x100/0x170 net/socket.c:2416 > >> > __sys_sendmsg+0xec/0x1b0 net/socket.c:2449 > >> > do_syscall_64+0xf6/0x7d0 arch/x86/entry/common.c:295 > >> > entry_SYSCALL_64_after_hwframe+0x49/0xb3 > >> > RIP: 0033:0x45ca29 > >> > > >> > Fixes: e9c1a793210f ("tipc: add dst_cache support for udp media") > >> > Cc: Xin Long <luc...@gm...> > >> > Cc: Jon Maloy <jon...@er...> > >> > Signed-off-by: Eric Dumazet <edu...@go...> > >> > Reported-by: syzbot <syz...@go...> > >> > --- > >> > net/tipc/udp_media.c | 6 +++++- > >> > 1 file changed, 5 insertions(+), 1 deletion(-) > >> > > >> > diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c > >> > index d6620ad535461a4d04ed5ba90569ce8b7df9f994..28a283f26a8dff24d613e6ed57e5e69d894dae66 100644 > >> > --- a/net/tipc/udp_media.c > >> > +++ b/net/tipc/udp_media.c > >> > @@ -161,9 +161,11 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff *skb, > >> > struct udp_bearer *ub, struct udp_media_addr *src, > >> > struct udp_media_addr *dst, struct dst_cache *cache) > >> > { > >> > - struct dst_entry *ndst = dst_cache_get(cache); > >> > + struct dst_entry *ndst; > >> > int ttl, err = 0; > >> > > >> > + local_bh_disable(); > >> > + ndst = dst_cache_get(cache); > >> > if (dst->proto == htons(ETH_P_IP)) { > >> > struct rtable *rt = (struct rtable *)ndst; > >> > > >> > @@ -210,9 +212,11 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff *skb, > >> > src->port, dst->port, false); > >> > #endif > >> > } > >> > + local_bh_enable(); > >> > return err; > >> > > >> > tx_error: > >> > + local_bh_enable(); > >> > kfree_skb(skb); > >> > return err; > >> > } > >> > -- > >> > 2.27.0.rc0.183.gde8f92d652-goog > >> > |
From: Jon M. <jm...@re...> - 2020-05-21 15:04:05
|
Hi Ying, Thinking more about this, isn't the perfect solution 64-bit addresses, *plus* a topology server that delivers events according to the consensus model, so that all users receive a consistent view of the cluster and application topology? Then, we could even go further, and add virtual synchrony on top of that, but that would have to be a selectable property, I think. I had a quick look at Canonical's RAFT (https://github.com/canonical/raft.git) implementation. It is 18,000 lines of code, i.e., larger than the whole of TIPC (14,000 lines), so it is a big chunk. I also found another one at https://github.com/willemt/raft.git, that was only 3,500 lines of code, although with uncertain quality status of course. I suspect that a large part of those is code that are dealing with neighbor monitoring, failure detection etc, i.e, services that the topology server already has for free. It would be very interesting to see if it would be possible to reduce this code amount to an acceptable volume for inclusion in the TIPC module. ///jon On 5/12/20 4:38 PM, Jon Maloy wrote: > Hi Ying, > I have for several years claimed that TIPC and RAFT would be the > perfect combination. > But what I had in mind was a more general-purpose user-land RAFT > consensus service on top of TIPC, where TIPC provides properties that > makes RAFT easier to implement and gives it shorter > discovery/convergence times during topology changes. > > By adding RAFT as a service *provided by* TIPC, I imagine you mean > something similar to (or even extension of) the topology server we > have now. This has not occurred to me, but it might not be a bad idea. > It all boils down to how complex it would be, and how much more code > we would need to add to TIPC, vs. the benefit we get. Your colleague > Allan Stevens used to say that TIPC needs a new "unique selling point" > to reach a wider adoption than it has now, and this might indeed be > it. Maybe a prototype to be done by somebody at WindRiver, DEK or RH? > > What I don't quite understand is that you present this as an > alternative to the 128-bit address scheme. We need that anyway as I > see it. Uniqueness for service types is only part of the reason for > that proposal. It is even about practicality of use, e.g., letting > users use strings, database keys, disk partitions etc. directly as > service instances and hence avoiding any lookup/translation steps. I > did not imagine that the TIPC itself generates any UUIDs or service > types/instance values; that part is still left to the user. > > I think the risk of service type collisions inside a cluster, if > generated properly by the user, is so low that it alone would not be > enough to justify such a large extension to TIPC. But if he could > have 100% guaranteed uniqueness as a side effect of such a new service > it would of course in nice. I assume you have more than just TIPC > address uniqueness in mind with this proposal? > > Regards > ///jon > > > > On 5/12/20 5:45 AM, Xue, Ying wrote: >> Hi Jon, >> >> Sorry for late response. >> >> Before I reply to your comments below, I want to discuss a more >> general question: >> >> When you posted the idea that we use UUID to resolve service name >> conflict issue, in the recent days I was always wondering whether we >> could implement Raft consensus algorithm (https://raft.github.io/) in >> internal TIPC module. In my opinion, there are different advantages >> and disadvantages respectively: >> >> UUID: >> Advantages: >> - Its generation algorithm is straightforward and Linux kernel has >> the interfaces available to generate UUID numbers. >> Disadvantages: >> - Protocol backwards compatibility >> - UUID generation algorithms cannot 100% guarantee that UUID >> numbers are not repeated particularly in distribution environment >> although the probability of UUID repeated occurrence is very little. >> >> Raft: >> Advantages: >> - Can 100% guarantee that service name doesn't conflict each other >> in in distribution environment >> - No protocol backwards compatibility issue >> Disadvantages: >> - Compared to the changes brought by UUID, the changes might not be >> very big, but it's quite complex particularly when we try to >> implement its algorithm in kernel space. >> >> I love to hear your opinion. >> >> Thanks., >> Ying >> >> -----Original Message----- >> From: Jon Maloy [mailto:jm...@re...] >> Sent: Tuesday, May 5, 2020 8:15 AM >> To: tip...@li... >> Cc: tun...@de...; hoa...@de...; >> tuo...@de...; ma...@do...; xi...@re...; Xue, >> Ying; par...@gm... >> Subject: Re: [RFC PATCH] tipc: define TIPC version 3 address types >> >> Hi all, >> I was pondering a little more about this possible feature. >> First of all, I realized that the following test >> >> bool tipc_msg_validate(struct sk_buff **_skb) >> { >> [...] >> if (unlikely(msg_version(hdr) != TIPC_VERSION)) >> return false; >> [...] >> } >> makes it very hard to update the version number in a >> backwards compatible way. Even discovery messages >> will be rejected by v2 nodes, and we don't get around >> that unless we do discovery with v2 messages, or send >> out a duplicate set (v2 +v3) of discovery messages. >> And, we can actually achieve exactly what we want >> with just setting another capability bit. >> So, we set bit #12 to mean "TIPC_EXTENDED", to also to >> mean "all previous capabilities are valid if this bit is set, >> no need to test for it" >> That way, we can zero out these bits and start reusing them >> for new capabilities when we need to. >> >> AF_TIPC3 now becomes AF_TIPCE, tipc_addr becomes >> tipce_addr etc. >> >> The binding table needs to be updated the following way: >> >> union publ_item { >> struct { >> __be32 type; >> __be32 lower; >> __be32 upper; >> } legacy; >> struct { >> u8 type[16]; >> u8 lower[16]; >> u8 upper[16]; >> } extended; >> }; >> >> struct publication { >> u8 extended; >> u8 scope; /* This can only take values [0:3] */ >> u8 spare[2]; >> union publ_item publ; >> u8 node[16]; >> u32 port; >> u32 key; >> struct list_head binding_node; >> struct list_head binding_sock; >> struct list_head local_publ; >> struct list_head all_publ; >> struct rcu_head rcu; >> }; >> >> struct distr_item { >> union publ_item; >> __be32 port; >> __be32 key; >> }; >> >> The NAME_DISTR protocol must be extended with a field >> indicating if it contains legacy publication(s) or extended >> publication(s). >> 'Extended' nodes receive separate bulks for legacy and >> extended publications, since it is hard to mix them in the >> same message. >> Legacy nodes only receive legacy publications, so in this >> case the distributor just send a bulk for those. >> >> The topology subscriber must be updated in a similar >> manner, but we can assume that the same socket cannot >> issue two types of subscriptions and receive two types >> of events; it has to be on or the other. This should >> simplify the task somewhat. >> >> User message header format needs to be changed for >> Service Address (Port Name) messages: >> - Type occupies word [8:B], i.e. bytes [32:47] >> - Instance occupies word [C:F], i.e. bytes [48:64] >> >> This is where it gets tricky. The 'header size' field is only 4 >> bits and counts 32-bit words. This means that current >> max header size that can be indicated is 60 bytes. >> A simple way might be to just extend the field with one of >> the tree unused bits [16:18] in word 1 as msb. That would >> be backwards compatible since those bits are currently 0, >> and no special tricks are needed. >> Regarding TIPC_MCAST_MSG we need yet another 16 bytes, >> [65:80] if we want to preserve the current semantics on >> [lower,upper]. However, I am highly uncertain if that feature >> is ever used and needed. We may be good by just keeping >> one 'instance' field just as in NAMED messages. >> >> The group cast protocol could be left for later, once we understand >> the consequences better than now, but semantically it should >> work just like now, except with a longer header and type/instance >> fields. >> >> It would also be nice if the 16 byte node identity replaces the current >> 4 byte node address/number in all interaction with user land, inclusive >> the presentation of the neighbor monitoring status in monitor.c. >> That can possibly also be left for later. >> >> Finally, would it be possible to mark a socket at 'legacy' or 'extended' >> without adding a new AF_TIPCE value? If this can be done in a >> not-too-ugly way it might be worth considering. >> >> ///jon >> >> >> >> >> On 4/27/20 9:53 PM, jm...@re... wrote: >>> From: Jon Maloy <jm...@re...> >>> >>> TIPC would be more attractive in a modern user environment such >>> as Kubernetes if it could provide a larger address range. >>> >>> Advantages: >>> - Users could directly use UUIDs, strings or other values as service >>> instances types and instances. >>> - No more risk of collisions between randomly selected service types >>> >>> The effect on the TIPC implementation and protocol would be >>> significant, >>> but this is still worth considering. >>> --- >>> include/linux/socket.h | 5 ++- >>> include/uapi/linux/tipc3.h | 79 >>> ++++++++++++++++++++++++++++++++++++++ >>> 2 files changed, 82 insertions(+), 2 deletions(-) >>> create mode 100644 include/uapi/linux/tipc3.h >>> >>> diff --git a/include/linux/socket.h b/include/linux/socket.h >>> index 54338fac45cb..ff2268ceedaf 100644 >>> --- a/include/linux/socket.h >>> +++ b/include/linux/socket.h >>> @@ -209,8 +209,8 @@ struct ucred { >>> * reuses AF_INET address family >>> */ >>> #define AF_XDP 44 /* XDP sockets */ >>> - >>> -#define AF_MAX 45 /* For now.. */ >>> +#define AF_TIPC3 45 /* TIPC version 3 sockets */ >>> +#define AF_MAX 46 /* For now.. */ >>> /* Protocol families, same as address families. */ >>> #define PF_UNSPEC AF_UNSPEC >>> @@ -260,6 +260,7 @@ struct ucred { >>> #define PF_QIPCRTR AF_QIPCRTR >>> #define PF_SMC AF_SMC >>> #define PF_XDP AF_XDP >>> +#define PF_TIPC3 AF_TIPC3 >>> #define PF_MAX AF_MAX >>> /* Maximum queue length specifiable by listen. */ >>> diff --git a/include/uapi/linux/tipc3.h b/include/uapi/linux/tipc3.h >>> new file mode 100644 >>> index 000000000000..0d385bc41b66 >>> --- /dev/null >>> +++ b/include/uapi/linux/tipc3.h >>> @@ -0,0 +1,79 @@ >>> +/* SPDX-License-Identifier: ((GPL-2.0 WITH Linux-syscall-note) OR >>> BSD-3-Clause) */ >>> +/* >>> + * include/uapi/linux/tipc3.h: Header for TIPC v3 socket interface >>> + * >>> + * Copyright (c) 2020 Red Hat Inc >>> + * All rights reserved. >>> + * >>> + * Redistribution and use in source and binary forms, with or without >>> + * modification, are permitted provided that the following >>> conditions are met: >>> + * >>> + * 1. Redistributions of source code must retain the above copyright >>> + * notice, this list of conditions and the following disclaimer. >>> + * 2. Redistributions in binary form must reproduce the above >>> copyright >>> + * notice, this list of conditions and the following disclaimer >>> in the >>> + * documentation and/or other materials provided with the >>> distribution. >>> + * 3. Neither the names of the copyright holders nor the names of its >>> + * contributors may be used to endorse or promote products >>> derived from >>> + * this software without specific prior written permission. >>> + * >>> + * Alternatively, this software may be distributed under the terms >>> of the >>> + * GNU General Public License ("GPL") version 2 as published by the >>> Free >>> + * Software Foundation. >>> + * >>> + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND >>> CONTRIBUTORS "AS IS" >>> + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT >>> LIMITED TO, THE >>> + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A >>> PARTICULAR PURPOSE >>> + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR >>> CONTRIBUTORS BE >>> + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR >>> + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, >>> PROCUREMENT OF >>> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR >>> BUSINESS >>> + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, >>> WHETHER IN >>> + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR >>> OTHERWISE) >>> + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF >>> ADVISED OF THE >>> + * POSSIBILITY OF SUCH DAMAGE. >>> + */ >>> + >>> +#ifndef _LINUX_TIPC3_H_ >>> +#define _LINUX_TIPC3_H_ >>> + >>> +#include <linux/types.h> >>> +#include <linux/sockios.h> >>> +#include <linux/tipc.h> >>> + >>> +struct tipc3_addr { >>> + __u8[16] type; /* zero if socket address */ >>> + __u8[16] instance; /* port if socket address */ >>> + __u8[16] node; /* zero if whole cluster */ >>> +}; >>> + >>> +struct tipc3_subscr { >>> + __u8[16] type; >>> + __u8[16] lower; >>> + __u8[16] upper; >>> + __u8[16] node; >>> + __u32 timeout; /* subscription duration (in ms) */ >>> + __u32 filter; /* bitmask of filter options */ >>> + __u8 usr_handle[16]; /* available for subscriber use */ >>> +}; >>> + >>> +struct tipc3_event { >>> + __u8[16] lower; /* matching range */ >>> + __u8[16] upper; /* " " */ >>> + struct tipc3_addr socket; /* associated socket */ >>> + struct tipc2_subscr sub; /* associated subscription */ >>> + __u32 event; /* event type */ >>> +}; >>> + >>> +struct sockaddr_tipc3 { >>> + unsigned short family; >>> + bool mcast; >>> + struct tipc3_addr addr; >>> +}; >>> + >>> +struct tipc3_group_req { >>> + struct tipc3_addr addr; >>> + __u32 flags; >>> +}; >>> + >>> +#endif > |
From: maloy <ma...@do...> - 2020-05-17 02:54:09
|
Hi Hoang,After more pondering, I realize that my suggestion below was not a good solution either. In particular, we can not have a transition rcast-to-mcast (bulk-to-publ/withdraw) without sending sync to all destinations, something we want to avoid.I think our problem is that we (well, me...) are trying to solve two different problems at the same time, using a mechanism that was only made for of them. So, what if we separate the problems again?1) We send all bulk updates to all nodes, irrespective of whether they support broadcast binding table update or not, as mandatory rcast. No bcast/rcast sync needs to be involved, and I don't think it is when we use 'mandatory'. tipc_node_bcast() can be eliminated.2) When we need to send out publish/withdraw, we just continue to send those as mandatory rcast to the nodes not supporting binding table bcast. I believe our current tipc_mcast_xmit(), given a list of legacy nodes, should do the trick even here.3) To those nodes supporting broadcast update we do send all publish/withdraw as mandatory broadcast.Now it remains to solve the synchronization between 1) and 3) type messages. This we have discussed before, and we know the solution is a "NOT_LAST" bit.This requires a slightly smarter tipc_namedistr_rcv() function which can sort between non_seq and unicast messages, but also refuses to dequeue any messages from the namedq until the 'not_last' bit is zero in an arrived bulk message. But no new synchronization queue should be needed.I think we now have a viable solution that solves all our problems without too much new code. Regards///jonSent from my Samsung Galaxy smartphone. -------- Original message --------From: Jon Maloy <jm...@re...> Date: 2020-05-12 17:21 (GMT-05:00) To: tip...@li..., tip...@de..., Xin Long <lx...@re...>, Ying Xue <yin...@wi...>, Shuang Li <sh...@re...> Subject: [tipc-discussion] Fwd: Re: FW: [PATCH 2/2] tipc: update a binding service via broadcast cc to tipc-discussion and others.///jon-------- Forwarded Message --------Subject: Re: FW: [PATCH 2/2] tipc: update a binding service via broadcastDate: Tue, 12 May 2020 14:38:29 -0400From: Jon Maloy <jm...@re...>To: Hoang Huu Le <hoa...@de...>, ma...@do... <ma...@do...>Hi Hoang,I had to re-read the discussion we had back in November to refresh my memory about this.To me v2 looks like a better base for further development than the most recent version,so I am sorry to have wasted your time here. I am still convinced that the mcast/rcast synchprotocol we developed can solve almost all our problems if we understand to use it right.What about the following:Sending side:- Even the initial bulk is sent via tipc_mxast_xmit(), but with only one destination node and as "mandatory rcast". The broadcast link may still choose to send it as bcast in case rcast has been manually disabled by the user, but the surplus messages will be dropped on the non-destination nodes. (In case of legacy nodes this is certain, in the case of newer nodes we must make sure that it happens by setting the 'destnode' field in the header and always check for this in tipc_named_rcv().) I think this is a weird and rare case where we can accept the extra overhead of broadcast. Under all normal conditions rcast will be used here.- Regular PUBLISH/WITHDRAW messages are sent as "mandatory bcast", or if some nodes don't not support TIPC_MCAST_RBCTL as "mandatory rcast". The broadcast link may still choose to send it as rcast, but this is nothing we need to worry about as long as the syncronization mechanism is active. Under all normal conditions bcast will be selected when possible, since all nodes are destinations.Receiving side:- We need a deferred queue, but last time i missed that we don't need one per node, it is sufficient to have a global one, as you were suggesting. So to me the code is ok here.- As mentioned earlier, tipc_named_rcv() only accepts messages where destnode is "self" (bulk) or zero (publish/withdraw).The point with all this is that all messages will now be subject to the synchronization mechanism, so the bulk messages areguaranteed to be delivered in order and ahead of individualpublications/withdraws, and even topology changes will almostnever lead to change of sending method and need for synchronization.We must make one change to the broadcast protocol though: even"mandatory" messages must be made subject to the synchronizationmechanism, something that doesn't seem to be the case now.("Or maybe we don't need to set "mandatory" at all? This needs to be checked.)The advantage of this method is simplicity.- We don't need any separate synchronization mechanism for NAME_DISTR messages at all, we just use what is already there.The disadvantage is that there will be a lot of SYN messages sentif we are not careful.- Every time a new node comes up, the other nodes will send it a a bcast SYN before they send the bulk, because they believe they have just switched from bcast (normal mode) to rcast (bulk mode). This one is in reality unnecessary, since we can be sure that the new node has not been sent any previous bcast that needs to be synched with.- At the first publication/withdraw sent after the bulk the neighbor nodes will send an rcast SYN to all other nodes, because they just "switched" back from rcast to bcast. In this case we really need to send an rcast SYN to the new node that came up, since this is the only one where there is risk of a race. This message does in reality serve as "end-of-bulk" message but is handled like any other SYN by the reciving tipc_mcast_filter_rcv() So, to avoid this the broadcast protocol needs to be able to recognize NAME_DISTR messages and treat then slightly different from the others, or it needs to be told what to do by the name_distr.c code via the API. Maybe a couple of new fields in struct tipc_mcast_method?What do you think?Regards///jonOn 5/12/20 6:22 AM, Hoang Huu Le wrote:> Just forward the patch I mentioned.>> -----Original Message-----> From: Hoang Le <hoa...@de...>> Sent: Tuesday, November 19, 2019 5:01 PM> To: jon...@er...; ma...@do...; tip...@de...> Subject: [PATCH 2/2] tipc: update a binding service via broadcast>> Currently, updating binding table (add service binding to> name table/withdraw a service binding) is being sent over replicast.> However, if we are scaling up clusters to > 100 nodes/containers this> method is less affection because of looping through nodes in a cluster one> by one.>> It is worth to use broadcast to update a binding service. Then binding> table updates in all nodes for one shot.>> The mechanism is backward compatible as sync message slient dropped.>> v2: resolve synchronization problem when switching from unicast to> broadcast>> Signed-off-by: Hoang Le <hoa...@de...>> ---> net/tipc/bcast.c | 3 ++-> net/tipc/link.c | 6 ++++++> net/tipc/name_table.c | 33 ++++++++++++++++++++++++++++++---> net/tipc/name_table.h | 4 ++++> net/tipc/node.c | 32 ++++++++++++++++++++++++++++++++> net/tipc/node.h | 2 ++> 6 files changed, 76 insertions(+), 4 deletions(-)>> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c> index e06f05d55534..44ed481fec47 100644> --- a/net/tipc/bcast.c> +++ b/net/tipc/bcast.c> @@ -324,7 +324,8 @@ static int tipc_mcast_send_sync(struct net *net, > struct sk_buff *skb,> hdr = buf_msg(skb);> if (msg_user(hdr) == MSG_FRAGMENTER)> hdr = msg_inner_hdr(hdr);> - if (msg_type(hdr) != TIPC_MCAST_MSG)> + if (msg_user(hdr) != NAME_DISTRIBUTOR &&> + msg_type(hdr) != TIPC_MCAST_MSG)> return 0;> /* Allocate dummy message */> diff --git a/net/tipc/link.c b/net/tipc/link.c> index fb72031228c9..a2e9a64d5a0f 100644> --- a/net/tipc/link.c> +++ b/net/tipc/link.c> @@ -1190,6 +1190,8 @@ static bool tipc_data_input(struct tipc_link *l, > struct sk_buff *skb,> struct sk_buff_head *inputq)> {> struct sk_buff_head *mc_inputq = l->bc_rcvlink->inputq;> + struct name_table *nt = tipc_name_table(l->net);> + struct sk_buff_head *defnq = &nt->defer_namedq;> struct tipc_msg *hdr = buf_msg(skb);> switch (msg_user(hdr)) {> @@ -1211,6 +1213,10 @@ static bool tipc_data_input(struct tipc_link > *l, struct sk_buff *skb,> case NAME_DISTRIBUTOR:> l->bc_rcvlink->state = LINK_ESTABLISHED;> skb_queue_tail(l->namedq, skb);> +> + spin_lock_bh(&defnq->lock);> + tipc_mcast_filter_msg(l->net, defnq, l->namedq);> + spin_unlock_bh(&defnq->lock);> return true;> case MSG_BUNDLER:> case TUNNEL_PROTOCOL:> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c> index 66a65c2cdb23..593dcd11357f 100644> --- a/net/tipc/name_table.c> +++ b/net/tipc/name_table.c> @@ -615,9 +615,11 @@ struct publication *tipc_nametbl_publish(struct > net *net, u32 type, u32 lower,> struct tipc_net *tn = tipc_net(net);> struct publication *p = NULL;> struct sk_buff *skb = NULL;> + bool rcast;> spin_lock_bh(&tn->nametbl_lock);> + rcast = nt->rcast;> if (nt->local_publ_count >= TIPC_MAX_PUBL) {> pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL);> goto exit;> @@ -632,8 +634,18 @@ struct publication *tipc_nametbl_publish(struct > net *net, u32 type, u32 lower,> exit:> spin_unlock_bh(&tn->nametbl_lock);> - if (skb)> - tipc_node_broadcast(net, skb);> + if (skb) {> + /* Use broadcast if all nodes support broadcast NAME_DISTR */> + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) {> + tipc_node_broadcast_named_publish(net, skb, &rcast);> + spin_lock_bh(&tn->nametbl_lock);> + nt->rcast = rcast;> + spin_unlock_bh(&tn->nametbl_lock);> + } else {> + /* Otherwise, be backwards compatible */> + tipc_node_broadcast(net, skb);> + }> + }> return p;> }> @@ -648,8 +660,10 @@ int tipc_nametbl_withdraw(struct net *net, u32 > type, u32 lower,> u32 self = tipc_own_addr(net);> struct sk_buff *skb = NULL;> struct publication *p;> + bool rcast;> spin_lock_bh(&tn->nametbl_lock);> + rcast = nt->rcast;> p = tipc_nametbl_remove_publ(net, type, lower, upper, self, key);> if (p) {> @@ -664,7 +678,16 @@ int tipc_nametbl_withdraw(struct net *net, u32 > type, u32 lower,> spin_unlock_bh(&tn->nametbl_lock);> if (skb) {> - tipc_node_broadcast(net, skb);> + /* Use broadcast if all nodes support broadcast NAME_DISTR */> + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) {> + tipc_node_broadcast_named_publish(net, skb, &rcast);> + spin_lock_bh(&tn->nametbl_lock);> + nt->rcast = rcast;> + spin_unlock_bh(&tn->nametbl_lock);> + } else {> + /* Otherwise, be backwards compatible */> + tipc_node_broadcast(net, skb);> + }> return 1;> }> return 0;> @@ -746,6 +769,9 @@ int tipc_nametbl_init(struct net *net)> INIT_LIST_HEAD(&nt->cluster_scope);> rwlock_init(&nt->cluster_scope_lock);> tn->nametbl = nt;> + /* 'bulk' updated messages via unicast */> + nt->rcast = true;> + skb_queue_head_init(&nt->defer_namedq);> spin_lock_init(&tn->nametbl_lock);> return 0;> }> @@ -784,6 +810,7 @@ void tipc_nametbl_stop(struct net *net)> * publications, then release the name table> */> spin_lock_bh(&tn->nametbl_lock);> + skb_queue_purge(&nt->defer_namedq);> for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {> if (hlist_empty(&nt->services[i]))> continue;> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h> index f79066334cc8..b8cdf2a29d48 100644> --- a/net/tipc/name_table.h> +++ b/net/tipc/name_table.h> @@ -95,6 +95,8 @@ struct publication {> * - used by name_distr to send bulk updates to new nodes> * - used by name_distr during re-init of name table> * @local_publ_count: number of publications issued by this node> + * @defer_namedq: temporarily queue for 'synching' update> + * @rcast: previous method used to publish/withdraw a service> */> struct name_table {> struct hlist_head services[TIPC_NAMETBL_SIZE];> @@ -102,6 +104,8 @@ struct name_table {> struct list_head cluster_scope;> rwlock_t cluster_scope_lock;> u32 local_publ_count;> + struct sk_buff_head defer_namedq;> + bool rcast;> };> int tipc_nl_name_table_dump(struct sk_buff *skb, struct > netlink_callback *cb);> diff --git a/net/tipc/node.c b/net/tipc/node.c> index aaf595613e6e..b058647fa78b 100644> --- a/net/tipc/node.c> +++ b/net/tipc/node.c> @@ -2981,3 +2981,35 @@ void tipc_node_pre_cleanup_net(struct net > *exit_net)> }> rcu_read_unlock();> }> +> +int tipc_node_broadcast_named_publish(struct net *net, struct sk_buff > *skb,> + bool *rcast)> +{> + struct tipc_mc_method method = {.rcast = *rcast};> + struct sk_buff_head xmitq;> + struct tipc_nlist dests;> + struct tipc_node *n;> + u16 cong_link_cnt;> + int rc = 0;> +> + __skb_queue_head_init(&xmitq);> + __skb_queue_tail(&xmitq, skb);> +> + tipc_nlist_init(&dests, tipc_own_addr(net));> + rcu_read_lock();> + list_for_each_entry_rcu(n, tipc_nodes(net), list) {> + if (in_own_node(net, n->addr))> + continue;> + if (!node_is_up(n))> + continue;> + tipc_nlist_add(&dests, n->addr);> + }> + rcu_read_unlock();> +> + rc = tipc_mcast_xmit(net, &xmitq, &method, &dests, &cong_link_cnt);> + *rcast = method.rcast;> +> + tipc_nlist_purge(&dests);> + __skb_queue_purge(&xmitq);> + return rc;> +}> diff --git a/net/tipc/node.h b/net/tipc/node.h> index a6803b449a2c..d7d19f9932b1 100644> --- a/net/tipc/node.h> +++ b/net/tipc/node.h> @@ -124,4 +124,6 @@ int tipc_nl_node_set_key(struct sk_buff *skb, > struct genl_info *info);> int tipc_nl_node_flush_key(struct sk_buff *skb, struct genl_info *info);> #endif> void tipc_node_pre_cleanup_net(struct net *exit_net);> +int tipc_node_broadcast_named_publish(struct net *net, struct sk_buff > *skb,> + bool *rcast);> #endif_______________________________________________tipc-discussion mailing lis...@li...https://lists.sourceforge.net/lists/listinfo/tipc-discussion |
From: David M. <da...@da...> - 2020-05-13 19:33:45
|
From: Tuong Lien <tuo...@de...> Date: Wed, 13 May 2020 19:33:15 +0700 > This series adds patches to fix some issues in TIPC streaming & service > subscription. Series applied. |
From: Tuong L. <tuo...@de...> - 2020-05-13 12:33:48
|
When a service subscription is expired or canceled by user, it needs to be deleted from the subscription list, so that new subscriptions can be registered (max = 65535 per net). However, there are two issues in code that can cause such an unused subscription to persist: 1) The 'tipc_conn_delete_sub()' has a loop on the subscription list but it makes a break shortly when the 1st subscription differs from the one specified, so the subscription will not be deleted. 2) In case a subscription is canceled, the code to remove the 'TIPC_SUB_CANCEL' flag from the subscription filter does not work if it is a local subscription (i.e. the little endian isn't involved). So, it will be no matches when looking for the subscription to delete later. The subscription(s) will be removed eventually when the user terminates its topology connection but that could be a long time later. Meanwhile, the number of available subscriptions may be exhausted. This commit fixes the two issues above, so as needed a subscription can be deleted correctly. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/subscr.h | 10 ++++++++++ net/tipc/topsrv.c | 9 +++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h index aa015c233898..6ebbec1bedd1 100644 --- a/net/tipc/subscr.h +++ b/net/tipc/subscr.h @@ -96,6 +96,16 @@ void tipc_sub_get(struct tipc_subscription *subscription); (swap_ ? swab32(val__) : val__); \ }) +/* tipc_sub_write - write val_ to field_ of struct sub_ in user endian format + */ +#define tipc_sub_write(sub_, field_, val_) \ + ({ \ + struct tipc_subscr *sub__ = sub_; \ + u32 val__ = val_; \ + int swap_ = !((sub__)->filter & TIPC_FILTER_MASK); \ + (sub__)->field_ = swap_ ? swab32(val__) : val__; \ + }) + /* tipc_evt_write - write val_ to field_ of struct evt_ in user endian format */ #define tipc_evt_write(evt_, field_, val_) \ diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c index 931c426673c0..446af7bbd13e 100644 --- a/net/tipc/topsrv.c +++ b/net/tipc/topsrv.c @@ -237,8 +237,8 @@ static void tipc_conn_delete_sub(struct tipc_conn *con, struct tipc_subscr *s) if (!s || !memcmp(s, &sub->evt.s, sizeof(*s))) { tipc_sub_unsubscribe(sub); atomic_dec(&tn->subscription_count); - } else if (s) { - break; + if (s) + break; } } spin_unlock_bh(&con->sub_lock); @@ -362,9 +362,10 @@ static int tipc_conn_rcv_sub(struct tipc_topsrv *srv, { struct tipc_net *tn = tipc_net(srv->net); struct tipc_subscription *sub; + u32 s_filter = tipc_sub_read(s, filter); - if (tipc_sub_read(s, filter) & TIPC_SUB_CANCEL) { - s->filter &= __constant_ntohl(~TIPC_SUB_CANCEL); + if (s_filter & TIPC_SUB_CANCEL) { + tipc_sub_write(s, filter, s_filter & ~TIPC_SUB_CANCEL); tipc_conn_delete_sub(con, s); return 0; } -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-13 12:33:47
|
Upon receipt of a service subscription request from user via a topology connection, one 'sub' object will be allocated in kernel, so it will be able to send an event of the service if any to the user correspondingly then. Also, in case of any failure, the connection will be shutdown and all the pertaining 'sub' objects will be freed. However, there is a race condition as follows resulting in memory leak: receive-work connection send-work | | | sub-1 |<------//-------| | sub-2 |<------//-------| | | |<---------------| evt for sub-x sub-3 |<------//-------| | : : : : : : | /--------| | | | * peer closed | | | | | | | |<-------X-------| evt for sub-y | | |<===============| sub-n |<------/ X shutdown | -> orphan | | That is, the 'receive-work' may get the last subscription request while the 'send-work' is shutting down the connection due to peer close. We had a 'lock' on the connection, so the two actions cannot be carried out simultaneously. If the last subscription is allocated e.g. 'sub-n', before the 'send-work' closes the connection, there will be no issue at all, the 'sub' objects will be freed. In contrast the last subscription will become orphan since the connection was closed, and we released all references. This commit fixes the issue by simply adding one test if the connection remains in 'connected' state right after we obtain the connection lock, then a subscription object can be created as usual, otherwise we ignore it. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Reported-by: Thang Ngo <tha...@de...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/topsrv.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/net/tipc/topsrv.c b/net/tipc/topsrv.c index 73dbed0c4b6b..931c426673c0 100644 --- a/net/tipc/topsrv.c +++ b/net/tipc/topsrv.c @@ -400,7 +400,9 @@ static int tipc_conn_rcv_from_sock(struct tipc_conn *con) return -EWOULDBLOCK; if (ret == sizeof(s)) { read_lock_bh(&sk->sk_callback_lock); - ret = tipc_conn_rcv_sub(srv, con, &s); + /* RACE: the connection can be closed in the meantime */ + if (likely(connected(con))) + ret = tipc_conn_rcv_sub(srv, con, &s); read_unlock_bh(&sk->sk_callback_lock); if (!ret) return 0; -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-13 12:33:45
|
Currently when a connection is in Nagle mode, we set the 'ack_required' bit in the last sending buffer and wait for the corresponding ACK prior to pushing more data. However, on the receiving side, the ACK is issued only when application really reads the whole data. Even if part of the last buffer is received, we will not do the ACK as required. This might cause an unnecessary delay since the receiver does not always fetch the message as fast as the sender, resulting in a large latency in the user message sending, which is: [one RTT + the receiver processing time]. The commit makes Nagle ACK as soon as possible i.e. when a message with the 'ack_required' arrives in the receiving side's stack even before it is processed or put in the socket receive queue... This way, we can limit the streaming latency to one RTT as committed in Nagle mode. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jm...@re...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/socket.c | 42 +++++++++++++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 87466607097f..e370ad0edd76 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -1739,22 +1739,21 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb, return 0; } -static void tipc_sk_send_ack(struct tipc_sock *tsk) +static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk) { struct sock *sk = &tsk->sk; - struct net *net = sock_net(sk); struct sk_buff *skb = NULL; struct tipc_msg *msg; u32 peer_port = tsk_peer_port(tsk); u32 dnode = tsk_peer_node(tsk); if (!tipc_sk_connected(sk)) - return; + return NULL; skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, tsk_own_node(tsk), peer_port, tsk->portid, TIPC_OK); if (!skb) - return; + return NULL; msg = buf_msg(skb); msg_set_conn_ack(msg, tsk->rcv_unacked); tsk->rcv_unacked = 0; @@ -1764,7 +1763,19 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk) tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf); msg_set_adv_win(msg, tsk->rcv_win); } - tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); + return skb; +} + +static void tipc_sk_send_ack(struct tipc_sock *tsk) +{ + struct sk_buff *skb; + + skb = tipc_sk_build_ack(tsk); + if (!skb) + return; + + tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk), + msg_link_selector(buf_msg(skb))); } static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) @@ -1938,7 +1949,6 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, bool peek = flags & MSG_PEEK; int offset, required, copy, copied = 0; int hlen, dlen, err, rc; - bool ack = false; long timeout; /* Catch invalid receive attempts */ @@ -1983,7 +1993,6 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Copy data if msg ok, otherwise return error/partial data */ if (likely(!err)) { - ack = msg_ack_required(hdr); offset = skb_cb->bytes_read; copy = min_t(int, dlen - offset, buflen - copied); rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy); @@ -2011,7 +2020,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m, /* Send connection flow control advertisement when applicable */ tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); - if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) + if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) tipc_sk_send_ack(tsk); /* Exit if all requested data or FIN/error received */ @@ -2105,9 +2114,11 @@ static void tipc_sk_proto_rcv(struct sock *sk, * tipc_sk_filter_connect - check incoming message for a connection-based socket * @tsk: TIPC socket * @skb: pointer to message buffer. + * @xmitq: for Nagle ACK if any * Returns true if message should be added to receive queue, false otherwise */ -static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) +static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb, + struct sk_buff_head *xmitq) { struct sock *sk = &tsk->sk; struct net *net = sock_net(sk); @@ -2171,8 +2182,17 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) if (!skb_queue_empty(&sk->sk_write_queue)) tipc_sk_push_backlog(tsk); /* Accept only connection-based messages sent by peer */ - if (likely(con_msg && !err && pport == oport && pnode == onode)) + if (likely(con_msg && !err && pport == oport && + pnode == onode)) { + if (msg_ack_required(hdr)) { + struct sk_buff *skb; + + skb = tipc_sk_build_ack(tsk); + if (skb) + __skb_queue_tail(xmitq, skb); + } return true; + } if (!tsk_peer_msg(tsk, hdr)) return false; if (!err) @@ -2267,7 +2287,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); limit = rcvbuf_limit(sk, skb); - if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) || + if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) || (!sk_conn && msg_connected(hdr)) || (!grp && msg_in_group(hdr))) err = TIPC_ERR_NO_PORT; -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2020-05-13 12:33:43
|
This series adds patches to fix some issues in TIPC streaming & service subscription. Tuong Lien (3): tipc: fix large latency in smart Nagle streaming tipc: fix memory leak in service subscripting tipc: fix failed service subscription deletion net/tipc/socket.c | 42 +++++++++++++++++++++++++++++++----------- net/tipc/subscr.h | 10 ++++++++++ net/tipc/topsrv.c | 13 ++++++++----- 3 files changed, 49 insertions(+), 16 deletions(-) -- 2.13.7 |
From: Hung Ta <hu...@op...> - 2020-05-13 12:32:06
|
Hi Ying, On Mon, May 11, 2020 at 8:52 PM Ying Xue <yin...@wi...> wrote: > Please validate whether it appears with the latest kernel. > > If you cannot, please provide more detailed info: > > 1. Capture TIPC packets with tcpdump tool; > Attached > 2. Provide dmesg log, but be sure each line of demeg logs should be prefixed by timestamps. > Attached > 3. Print timestamp as well when you receive TIPC_WITHDRAW on user land. > Test scenario: 1. Setting up 2 nodes for the cluster, and getting them up 2. Reboot node 1 at 11:12:20. At node 2: until 11:12:30 (after 10s) the WITHDRAW (death) event came: Wed May 13 11:12:30.339 2020 [clTipcNotification.c:395] (scI1.1040 : AMF.TIPC.NOTIF.01502 : INFO) Got node [death] notification (2) for node [1] I'm testing on the latest Ubuntu release (20.04 64 bits , kernel 5.4.0-29-generic) Thanks. > > On 5/8/20 11:01 AM, Hung Ta wrote: > > Hi TIPC experts, > > > > I'm using the TIPC library in my project which needs to be aware of a > node > > leaves the cluster. > > > > So, I use socket type AF_TIPC and SOCK_SEQPACKET and connect it > > to TIPC_TOP_SRV. > > > > I tried to get several nodes up and then make one of them leave and then > I > > can see the event TIPC_WITHDRAW seen, but the issue is it comes very > late, > > in particular, it comes about 10 seconds late after a node left the > > cluster. > > I'm using Ubuntu 16.04, kernel 4.4.0-87-generic. > > The same issue also occurs in Ubuntu 14.04. > > > > Why is it too late? > > Appreciate your help. > > > > Thanks and regards, > > Hung > > > > _______________________________________________ > > tipc-discussion mailing list > > tip...@li... > > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > > > |