|
From: Jon M. <jm...@re...> - 2020-06-08 13:33:51
|
On 6/6/20 11:10 PM, Hoang Huu Le wrote:
> -----Original Message-----
> From: Jon Maloy <jm...@re...>
> Sent: Friday, June 5, 2020 8:03 PM
> To: Hoang Huu Le <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li...
> Subject: Re: [next-net v6] tipc: update a binding service via broadcast
>
>
>
> On 6/5/20 3:52 AM, Hoang Huu Le wrote:
>> Currently, updating binding table (add service binding to
>> name table/withdraw a service binding) is being sent over replicast.
>> However, if we are scaling up clusters to > 100 nodes/containers this
>> method is less affection because of looping through nodes in a cluster one
>> by one.
[...]
>> + if (*open && (*rcv_nxt == seqno)) {
>> + (*rcv_nxt)++;
>> + __skb_unlink(skb, namedq);
>> + return skb;
>> + }
>> +
>> + if (less(seqno, *rcv_nxt)) {
>> + __skb_unlink(skb, namedq);
>> + kfree_skb(skb);
>> + continue;
> Still not needed. This queue should be flushed in
> tipc_node_lost_contact(), which I now see we don't do.
> [Hoang] Yes, that's right. I will verify and send it out.
>
> This has to e fixed too.
> ///jon
I hate to admit it, but we might actually need this test after all.
Imagine that somebody has done 'publish' just after the broadcast link
came up (in tipc_bcast_add_peer()) , but before tipc_named_node_up() is
called. The context of those two calls is not atomic, so I think it is
possible that this publication might end up both in the bcast_link
backlog queue and in the bulk distribution.
This publication message will have a named_seqno that is lower than the
agreed synch point, and should be dropped at reception.
Given the crucial role of the binding table for the overall TIPC
functionality I think it is better be safe than sorry here, and keep
this test.
///jon
>> + }
>> + }
>> + return NULL;
>> +}
>> +
>> /**
>> * tipc_named_rcv - process name table update messages sent by another node
>> */
>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
>> + u16 *rcv_nxt, bool *open)
>> {
>> - struct tipc_net *tn = net_generic(net, tipc_net_id);
>> - struct tipc_msg *msg;
>> + struct tipc_net *tn = tipc_net(net);
>> struct distr_item *item;
>> - uint count;
>> - u32 node;
>> + struct tipc_msg *hdr;
>> struct sk_buff *skb;
>> - int mtype;
>> + u32 count, node = 0;
>>
>> spin_lock_bh(&tn->nametbl_lock);
>> - for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
>> - skb_linearize(skb);
>> - msg = buf_msg(skb);
>> - mtype = msg_type(msg);
>> - item = (struct distr_item *)msg_data(msg);
>> - count = msg_data_sz(msg) / ITEM_SIZE;
>> - node = msg_orignode(msg);
>> + while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
>> + hdr = buf_msg(skb);
>> + node = msg_orignode(hdr);
>> + item = (struct distr_item *)msg_data(hdr);
>> + count = msg_data_sz(hdr) / ITEM_SIZE;
>> while (count--) {
>> - tipc_update_nametbl(net, item, node, mtype);
>> + tipc_update_nametbl(net, item, node, msg_type(hdr));
>> item++;
>> }
>> kfree_skb(skb);
>> @@ -345,6 +402,6 @@ void tipc_named_reinit(struct net *net)
>> publ->node = self;
>> list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
>> publ->node = self;
>> -
>> + nt->rc_dests = 0;
>> spin_unlock_bh(&tn->nametbl_lock);
>> }
>> diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
>> index 63fc73e0fa6c..092323158f06 100644
>> --- a/net/tipc/name_distr.h
>> +++ b/net/tipc/name_distr.h
>> @@ -67,11 +67,14 @@ struct distr_item {
>> __be32 key;
>> };
>>
>> +void tipc_named_bcast(struct net *net, struct sk_buff *skb);
>> struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
>> struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
>> -void tipc_named_node_up(struct net *net, u32 dnode);
>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
>> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
>> + u16 *rcv_nxt, bool *open);
>> void tipc_named_reinit(struct net *net);
>> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
>> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
>> + u32 addr, u16 capabilities);
>>
>> #endif
>> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
>> index 359b2bc888cf..2ac33d32edc2 100644
>> --- a/net/tipc/name_table.c
>> +++ b/net/tipc/name_table.c
>> @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
>> struct tipc_net *tn = tipc_net(net);
>> struct publication *p = NULL;
>> struct sk_buff *skb = NULL;
>> + u32 rc_dests;
>>
>> spin_lock_bh(&tn->nametbl_lock);
>>
>> @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
>> nt->local_publ_count++;
>> skb = tipc_named_publish(net, p);
>> }
>> + rc_dests = nt->rc_dests;
>> exit:
>> spin_unlock_bh(&tn->nametbl_lock);
>>
>> if (skb)
>> - tipc_node_broadcast(net, skb);
>> + tipc_node_broadcast(net, skb, rc_dests);
>> return p;
>> +
>> }
>>
>> /**
>> @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
>> u32 self = tipc_own_addr(net);
>> struct sk_buff *skb = NULL;
>> struct publication *p;
>> + u32 rc_dests;
>>
>> spin_lock_bh(&tn->nametbl_lock);
>>
>> @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
>> pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
>> type, lower, upper, key);
>> }
>> + rc_dests = nt->rc_dests;
>> spin_unlock_bh(&tn->nametbl_lock);
>>
>> if (skb) {
>> - tipc_node_broadcast(net, skb);
>> + tipc_node_broadcast(net, skb, rc_dests);
>> return 1;
>> }
>> return 0;
>> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
>> index 728bc7016c38..8064e1986e2c 100644
>> --- a/net/tipc/name_table.h
>> +++ b/net/tipc/name_table.h
>> @@ -106,6 +106,8 @@ struct name_table {
>> struct list_head cluster_scope;
>> rwlock_t cluster_scope_lock;
>> u32 local_publ_count;
>> + u32 rc_dests;
>> + u32 snd_nxt;
>> };
>>
>> int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
>> diff --git a/net/tipc/node.c b/net/tipc/node.c
>> index 803a3a6d0f50..ad8d7bce1f98 100644
>> --- a/net/tipc/node.c
>> +++ b/net/tipc/node.c
>> @@ -75,6 +75,8 @@ struct tipc_bclink_entry {
>> struct sk_buff_head arrvq;
>> struct sk_buff_head inputq2;
>> struct sk_buff_head namedq;
>> + u16 named_rcv_nxt;
>> + bool named_open;
>> };
>>
>> /**
>> @@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n)
>> write_unlock_bh(&n->lock);
>>
>> if (flags & TIPC_NOTIFY_NODE_DOWN)
>> - tipc_publ_notify(net, publ_list, addr);
>> + tipc_publ_notify(net, publ_list, addr, n->capabilities);
>>
>> if (flags & TIPC_NOTIFY_NODE_UP)
>> - tipc_named_node_up(net, addr);
>> + tipc_named_node_up(net, addr, n->capabilities);
>>
>> if (flags & TIPC_NOTIFY_LINK_UP) {
>> tipc_mon_peer_up(net, addr, bearer_id);
>> @@ -1729,12 +1731,23 @@ int tipc_node_distr_xmit(struct net *net, struct sk_buff_head *xmitq)
>> return 0;
>> }
>>
>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
>> {
>> + struct sk_buff_head xmitq;
>> struct sk_buff *txskb;
>> struct tipc_node *n;
>> + u16 dummy;
>> u32 dst;
>>
>> + /* Use broadcast if all nodes support it */
>> + if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
>> + __skb_queue_head_init(&xmitq);
>> + __skb_queue_tail(&xmitq, skb);
>> + tipc_bcast_xmit(net, &xmitq, &dummy);
>> + return;
>> + }
>> +
>> + /* Otherwise use legacy replicast method */
>> rcu_read_lock();
>> list_for_each_entry_rcu(n, tipc_nodes(net), list) {
>> dst = n->addr;
>> @@ -1749,7 +1762,6 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
>> tipc_node_xmit_skb(net, txskb, dst, 0);
>> }
>> rcu_read_unlock();
>> -
>> kfree_skb(skb);
>> }
>>
>> @@ -1844,7 +1856,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
>>
>> /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
>> if (!skb_queue_empty(&n->bc_entry.namedq))
>> - tipc_named_rcv(net, &n->bc_entry.namedq);
>> + tipc_named_rcv(net, &n->bc_entry.namedq,
>> + &n->bc_entry.named_rcv_nxt,
>> + &n->bc_entry.named_open);
>>
>> /* If reassembly or retransmission failure => reset all links to peer */
>> if (rc & TIPC_LINK_DOWN_EVT)
>> @@ -2109,7 +2123,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
>> tipc_node_link_down(n, bearer_id, false);
>>
>> if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
>> - tipc_named_rcv(net, &n->bc_entry.namedq);
>> + tipc_named_rcv(net, &n->bc_entry.namedq,
>> + &n->bc_entry.named_rcv_nxt,
>> + &n->bc_entry.named_open);
>>
>> if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
>> tipc_node_mcast_rcv(n);
>> diff --git a/net/tipc/node.h b/net/tipc/node.h
>> index a6803b449a2c..9f6f13f1604f 100644
>> --- a/net/tipc/node.h
>> +++ b/net/tipc/node.h
>> @@ -55,7 +55,8 @@ enum {
>> TIPC_MCAST_RBCTL = (1 << 7),
>> TIPC_GAP_ACK_BLOCK = (1 << 8),
>> TIPC_TUNNEL_ENHANCED = (1 << 9),
>> - TIPC_NAGLE = (1 << 10)
>> + TIPC_NAGLE = (1 << 10),
>> + TIPC_NAMED_BCAST = (1 << 11)
>> };
>>
>> #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
>> @@ -68,7 +69,8 @@ enum {
>> TIPC_MCAST_RBCTL | \
>> TIPC_GAP_ACK_BLOCK | \
>> TIPC_TUNNEL_ENHANCED | \
>> - TIPC_NAGLE)
>> + TIPC_NAGLE | \
>> + TIPC_NAMED_BCAST)
>>
>> #define INVALID_BEARER_ID -1
>>
>> @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
>> u32 selector);
>> void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr);
>> void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr);
>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests);
>> int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
>> void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
>> int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);
|