From: Hoang H. Le <hoa...@de...> - 2020-06-09 09:35:55
|
-----Original Message----- From: Jon Maloy <jm...@re...> Sent: Monday, June 8, 2020 8:33 PM To: Hoang Huu Le <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li... Subject: Re: [next-net v6] tipc: update a binding service via broadcast On 6/6/20 11:10 PM, Hoang Huu Le wrote: > -----Original Message----- > From: Jon Maloy <jm...@re...> > Sent: Friday, June 5, 2020 8:03 PM > To: Hoang Huu Le <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li... > Subject: Re: [next-net v6] tipc: update a binding service via broadcast > > > > On 6/5/20 3:52 AM, Hoang Huu Le wrote: >> Currently, updating binding table (add service binding to >> name table/withdraw a service binding) is being sent over replicast. >> However, if we are scaling up clusters to > 100 nodes/containers this >> method is less affection because of looping through nodes in a cluster one >> by one. [...] >> + if (*open && (*rcv_nxt == seqno)) { >> + (*rcv_nxt)++; >> + __skb_unlink(skb, namedq); >> + return skb; >> + } >> + >> + if (less(seqno, *rcv_nxt)) { >> + __skb_unlink(skb, namedq); >> + kfree_skb(skb); >> + continue; > Still not needed. This queue should be flushed in > tipc_node_lost_contact(), which I now see we don't do. > [Hoang] Yes, that's right. I will verify and send it out. > > This has to e fixed too. > ///jon I hate to admit it, but we might actually need this test after all. Imagine that somebody has done 'publish' just after the broadcast link came up (in tipc_bcast_add_peer()) , but before tipc_named_node_up() is called. The context of those two calls is not atomic, so I think it is possible that this publication might end up both in the bcast_link backlog queue and in the bulk distribution. This publication message will have a named_seqno that is lower than the agreed synch point, and should be dropped at reception. Given the crucial role of the binding table for the overall TIPC functionality I think it is better be safe than sorry here, and keep this test. [Hoang] Finally, I'm able to reproduce the problem as same as above scene: <code> 357 if (less(seqno, *rcv_nxt)) { 358 pr_info("DROP[%x->%x]: %s blk %d lblk %d nxt %d legacy %d seqno %u bc %u hsz %u dsz %u qlen %u\n", 359 msg_orignode(hdr), tipc_own_addr(net), 360 msg_type(hdr) == PUBLICATION ? "PUBL":"DRAW", 361 msg_is_bulk(hdr), msg_is_last_bulk(hdr), 362 *rcv_nxt, msg_is_legacy(hdr), 363 msg_named_seqno(hdr), msg_non_seq(hdr), 364 msg_hdr_sz(hdr), msg_data_sz(hdr), 365 skb_queue_len(namedq)); 366 367 __skb_unlink(skb, namedq); 368 kfree_skb(skb); 369 continue; 370 } </code> ----------- [12528.036895] tipc: Established link <1001024:eth0-1001001:brtipc> on network plane A [12528.043857] tipc: Established link <1001002:brtipc-1001001:brtipc> on network plane A [12528.136462] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3878 bc 0 hsz 40 dsz 20 qlen 23 [12528.140864] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3879 bc 0 hsz 40 dsz 20 qlen 22 [...] [12528.210959] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3893 bc 0 hsz 40 dsz 20 qlen 8 [12528.218903] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3894 bc 0 hsz 40 dsz 20 qlen 7 [12528.227214] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3878 bc 0 hsz 40 dsz 20 qlen 23 [12528.231285] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3879 bc 0 hsz 40 dsz 20 qlen 22 [...] [12528.277445] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3893 bc 0 hsz 40 dsz 20 qlen 8 [12528.280847] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 0 seqno 3894 bc 0 hsz 40 dsz 20 qlen 7 --- I will re-post the patch including the test as well. ///jon >> + } >> + } >> + return NULL; >> +} >> + >> /** >> * tipc_named_rcv - process name table update messages sent by another node >> */ >> -void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq) >> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq, >> + u16 *rcv_nxt, bool *open) >> { >> - struct tipc_net *tn = net_generic(net, tipc_net_id); >> - struct tipc_msg *msg; >> + struct tipc_net *tn = tipc_net(net); >> struct distr_item *item; >> - uint count; >> - u32 node; >> + struct tipc_msg *hdr; >> struct sk_buff *skb; >> - int mtype; >> + u32 count, node = 0; >> >> spin_lock_bh(&tn->nametbl_lock); >> - for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) { >> - skb_linearize(skb); >> - msg = buf_msg(skb); >> - mtype = msg_type(msg); >> - item = (struct distr_item *)msg_data(msg); >> - count = msg_data_sz(msg) / ITEM_SIZE; >> - node = msg_orignode(msg); >> + while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) { >> + hdr = buf_msg(skb); >> + node = msg_orignode(hdr); >> + item = (struct distr_item *)msg_data(hdr); >> + count = msg_data_sz(hdr) / ITEM_SIZE; >> while (count--) { >> - tipc_update_nametbl(net, item, node, mtype); >> + tipc_update_nametbl(net, item, node, msg_type(hdr)); >> item++; >> } >> kfree_skb(skb); >> @@ -345,6 +402,6 @@ void tipc_named_reinit(struct net *net) >> publ->node = self; >> list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node) >> publ->node = self; >> - >> + nt->rc_dests = 0; >> spin_unlock_bh(&tn->nametbl_lock); >> } >> diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h >> index 63fc73e0fa6c..092323158f06 100644 >> --- a/net/tipc/name_distr.h >> +++ b/net/tipc/name_distr.h >> @@ -67,11 +67,14 @@ struct distr_item { >> __be32 key; >> }; >> >> +void tipc_named_bcast(struct net *net, struct sk_buff *skb); >> struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ); >> struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ); >> -void tipc_named_node_up(struct net *net, u32 dnode); >> -void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue); >> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities); >> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq, >> + u16 *rcv_nxt, bool *open); >> void tipc_named_reinit(struct net *net); >> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr); >> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list, >> + u32 addr, u16 capabilities); >> >> #endif >> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c >> index 359b2bc888cf..2ac33d32edc2 100644 >> --- a/net/tipc/name_table.c >> +++ b/net/tipc/name_table.c >> @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, >> struct tipc_net *tn = tipc_net(net); >> struct publication *p = NULL; >> struct sk_buff *skb = NULL; >> + u32 rc_dests; >> >> spin_lock_bh(&tn->nametbl_lock); >> >> @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, >> nt->local_publ_count++; >> skb = tipc_named_publish(net, p); >> } >> + rc_dests = nt->rc_dests; >> exit: >> spin_unlock_bh(&tn->nametbl_lock); >> >> if (skb) >> - tipc_node_broadcast(net, skb); >> + tipc_node_broadcast(net, skb, rc_dests); >> return p; >> + >> } >> >> /** >> @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, >> u32 self = tipc_own_addr(net); >> struct sk_buff *skb = NULL; >> struct publication *p; >> + u32 rc_dests; >> >> spin_lock_bh(&tn->nametbl_lock); >> >> @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, >> pr_err("Failed to remove local publication {%u,%u,%u}/%u\n", >> type, lower, upper, key); >> } >> + rc_dests = nt->rc_dests; >> spin_unlock_bh(&tn->nametbl_lock); >> >> if (skb) { >> - tipc_node_broadcast(net, skb); >> + tipc_node_broadcast(net, skb, rc_dests); >> return 1; >> } >> return 0; >> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h >> index 728bc7016c38..8064e1986e2c 100644 >> --- a/net/tipc/name_table.h >> +++ b/net/tipc/name_table.h >> @@ -106,6 +106,8 @@ struct name_table { >> struct list_head cluster_scope; >> rwlock_t cluster_scope_lock; >> u32 local_publ_count; >> + u32 rc_dests; >> + u32 snd_nxt; >> }; >> >> int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); >> diff --git a/net/tipc/node.c b/net/tipc/node.c >> index 803a3a6d0f50..ad8d7bce1f98 100644 >> --- a/net/tipc/node.c >> +++ b/net/tipc/node.c >> @@ -75,6 +75,8 @@ struct tipc_bclink_entry { >> struct sk_buff_head arrvq; >> struct sk_buff_head inputq2; >> struct sk_buff_head namedq; >> + u16 named_rcv_nxt; >> + bool named_open; >> }; >> >> /** >> @@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n) >> write_unlock_bh(&n->lock); >> >> if (flags & TIPC_NOTIFY_NODE_DOWN) >> - tipc_publ_notify(net, publ_list, addr); >> + tipc_publ_notify(net, publ_list, addr, n->capabilities); >> >> if (flags & TIPC_NOTIFY_NODE_UP) >> - tipc_named_node_up(net, addr); >> + tipc_named_node_up(net, addr, n->capabilities); >> >> if (flags & TIPC_NOTIFY_LINK_UP) { >> tipc_mon_peer_up(net, addr, bearer_id); >> @@ -1729,12 +1731,23 @@ int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq) >> return 0; >> } >> >> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb) >> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests) >> { >> + struct sk_buff_head xmitq; >> struct sk_buff *txskb; >> struct tipc_node *n; >> + u16 dummy; >> u32 dst; >> >> + /* Use broadcast if all nodes support it */ >> + if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) { >> + __skb_queue_head_init(&xmitq); >> + __skb_queue_tail(&xmitq, skb); >> + tipc_bcast_xmit(net, &xmitq, &dummy); >> + return; >> + } >> + >> + /* Otherwise use legacy replicast method */ >> rcu_read_lock(); >> list_for_each_entry_rcu(n, tipc_nodes(net), list) { >> dst = n->addr; >> @@ -1749,7 +1762,6 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb) >> tipc_node_xmit_skb(net, txskb, dst, 0); >> } >> rcu_read_unlock(); >> - >> kfree_skb(skb); >> } >> >> @@ -1844,7 +1856,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id >> >> /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */ >> if (!skb_queue_empty(&n->bc_entry.namedq)) >> - tipc_named_rcv(net, &n->bc_entry.namedq); >> + tipc_named_rcv(net, &n->bc_entry.namedq, >> + &n->bc_entry.named_rcv_nxt, >> + &n->bc_entry.named_open); >> >> /* If reassembly or retransmission failure => reset all links to peer */ >> if (rc & TIPC_LINK_DOWN_EVT) >> @@ -2109,7 +2123,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) >> tipc_node_link_down(n, bearer_id, false); >> >> if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) >> - tipc_named_rcv(net, &n->bc_entry.namedq); >> + tipc_named_rcv(net, &n->bc_entry.namedq, >> + &n->bc_entry.named_rcv_nxt, >> + &n->bc_entry.named_open); >> >> if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1))) >> tipc_node_mcast_rcv(n); >> diff --git a/net/tipc/node.h b/net/tipc/node.h >> index a6803b449a2c..9f6f13f1604f 100644 >> --- a/net/tipc/node.h >> +++ b/net/tipc/node.h >> @@ -55,7 +55,8 @@ enum { >> TIPC_MCAST_RBCTL = (1 << 7), >> TIPC_GAP_ACK_BLOCK = (1 << 8), >> TIPC_TUNNEL_ENHANCED = (1 << 9), >> - TIPC_NAGLE = (1 << 10) >> + TIPC_NAGLE = (1 << 10), >> + TIPC_NAMED_BCAST = (1 << 11) >> }; >> >> #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ >> @@ -68,7 +69,8 @@ enum { >> TIPC_MCAST_RBCTL | \ >> TIPC_GAP_ACK_BLOCK | \ >> TIPC_TUNNEL_ENHANCED | \ >> - TIPC_NAGLE) >> + TIPC_NAGLE | \ >> + TIPC_NAMED_BCAST) >> >> #define INVALID_BEARER_ID -1 >> >> @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, >> u32 selector); >> void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr); >> void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr); >> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb); >> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests); >> int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); >> void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); >> int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected); |