From: Hoang Le <hoa...@de...> - 2019-01-29 09:22:35
|
Currently, a multicast stream may start out using replicast, because there are few destinations, and then it should ideally switch to L2/broadcast IGMP/multicast when the number of destinations grows beyond a certain limit. The opposite should happen when the number decreases below the limit. To eliminate the risk of message reordering caused by method change, a sending socket must stick to a previously selected method until it enters an idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from the sender socket. In this fix, we allow such a switch between replicast and broadcast without a 5 seconds pause in the traffic. Solution is to send a dummy message with only the header, also with the SYN bit set, via broadcast/replicast. For the data message, the SYN bit set and sending via replicast/broadcast. Then, at receiving side any messages follow first SYN bit message (data or dummy), they will be hold in deferred queue until another pair (dummy or data) arrived. For compatibility reasons we have to introduce a new capability flag TIPC_MCAST_RBCTL to handle this new feature. Because of there is a dummy message sent out, then poll return empty at old machines. Signed-off-by: Jon Maloy <jon...@er...> Signed-off-by: Hoang Le <hoa...@de...> --- net/tipc/bcast.c | 116 +++++++++++++++++++++++++++++++++++++++++++++- net/tipc/bcast.h | 5 ++ net/tipc/core.c | 2 + net/tipc/core.h | 3 ++ net/tipc/msg.h | 10 ++++ net/tipc/node.c | 10 ++++ net/tipc/node.h | 6 ++- net/tipc/socket.c | 10 ++++ 8 files changed, 159 insertions(+), 3 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index d8026543bf4c..e3a85227d4aa 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, struct tipc_mc_method *method, struct tipc_nlist *dests, u16 *cong_link_cnt) { - struct sk_buff_head inputq, localq; + struct sk_buff_head inputq, localq, tmpq; + bool rcast = method->rcast; + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr, *_hdr; int rc = 0; skb_queue_head_init(&inputq); skb_queue_head_init(&localq); + skb_queue_head_init(&tmpq); /* Clone packets before they are consumed by next call */ if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ -309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, /* Send according to determined transmit method */ if (dests->remote) { tipc_bcast_select_xmit_method(net, dests->remote, method); + + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { + skb = skb_peek(pkts); + hdr = buf_msg(skb); + + if (msg_user(hdr) == MSG_FRAGMENTER) + hdr = msg_get_wrapped(hdr); + if (msg_type(hdr) != TIPC_MCAST_MSG) + goto xmit; + + msg_set_syn(hdr, 0); + msg_set_is_rcast(hdr, method->rcast); + + /* switch mode */ + if (rcast != method->rcast) { + /* Build message's copied */ + _skb = tipc_buf_acquire(MCAST_H_SIZE, + GFP_KERNEL); + if (!skb) { + rc = -ENOMEM; + goto exit; + } + skb_orphan(_skb); + skb_copy_to_linear_data(_skb, hdr, + MCAST_H_SIZE); + + /* Build dummy header */ + _hdr = buf_msg(_skb); + msg_set_size(_hdr, MCAST_H_SIZE); + __skb_queue_tail(&tmpq, _skb); + + msg_set_syn(hdr, 1); + msg_set_syn(_hdr, 1); + msg_set_is_rcast(_hdr, rcast); + /* Prepare for 'synching' */ + if (rcast) + tipc_rcast_xmit(net, &tmpq, dests, + cong_link_cnt); + else + tipc_bcast_xmit(net, &tmpq, + cong_link_cnt); + + /* This queue should normally be empty by now */ + __skb_queue_purge(&tmpq); + } + } +xmit: if (method->rcast) rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); else @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) nl->remote = 0; nl->local = false; } + +void tipc_mcast_filter_msg(struct sk_buff_head *defq, + struct sk_buff_head *inputq) +{ + struct sk_buff *skb, *_skb; + struct tipc_msg *hdr, *_hdr; + u32 node, port, _node, _port; + bool match = false; + + skb = __skb_dequeue(inputq); + if (!skb) + return; + + hdr = buf_msg(skb); + node = msg_orignode(hdr); + port = msg_origport(hdr); + + /* Find a peer port if its existing in defer queue */ + while ((_skb = skb_peek(defq))) { + _hdr = buf_msg(_skb); + _node = msg_orignode(_hdr); + _port = msg_origport(_hdr); + + if (_node != node) + continue; + if (_port != port) + continue; + + if (!match) { + if (msg_is_syn(hdr) && + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { + __skb_dequeue(defq); + if (msg_data_sz(hdr)) { + __skb_queue_tail(inputq, skb); + kfree_skb(_skb); + } else { + __skb_queue_tail(inputq, _skb); + kfree_skb(skb); + } + match = true; + } else { + break; + } + } else { + if (msg_is_syn(_hdr)) + return; + /* Dequeued to receive buffer */ + __skb_dequeue(defq); + __skb_queue_tail(inputq, _skb); + } + } + + if (match) + return; + + if (msg_is_syn(hdr)) { + /* Enqueue and defer to next synching */ + __skb_queue_tail(defq, skb); + } else { + /* Direct enqueued */ + __skb_queue_tail(inputq, skb); + } +} diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index 751530ab0c49..165d88a503e4 100644 --- a/net/tipc/bcast.h +++ b/net/tipc/bcast.h @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node); /* Cookie to be used between socket and broadcast layer * @rcast: replicast (instead of broadcast) was used at previous xmit * @mandatory: broadcast/replicast indication was set by user + * @deferredq: defer queue to make message in order * @expires: re-evaluate non-mandatory transmit method if we are past this */ struct tipc_mc_method { bool rcast; bool mandatory; + struct sk_buff_head deferredq; unsigned long expires; }; @@ -92,6 +94,9 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); int tipc_bclink_reset_stats(struct net *net); +void tipc_mcast_filter_msg(struct sk_buff_head *defq, + struct sk_buff_head *inputq); + static inline void tipc_bcast_lock(struct net *net) { spin_lock_bh(&tipc_net(net)->bclock); diff --git a/net/tipc/core.c b/net/tipc/core.c index 5b38f5164281..27cccd101ef6 100644 --- a/net/tipc/core.c +++ b/net/tipc/core.c @@ -43,6 +43,7 @@ #include "net.h" #include "socket.h" #include "bcast.h" +#include "node.h" #include <linux/module.h> @@ -59,6 +60,7 @@ static int __net_init tipc_init_net(struct net *net) tn->node_addr = 0; tn->trial_addr = 0; tn->addr_trial_end = 0; + tn->capabilities = TIPC_NODE_CAPABILITIES; memset(tn->node_id, 0, sizeof(tn->node_id)); memset(tn->node_id_string, 0, sizeof(tn->node_id_string)); tn->mon_threshold = TIPC_DEF_MON_THRESHOLD; diff --git a/net/tipc/core.h b/net/tipc/core.h index 8020a6c360ff..7a68e1b6a066 100644 --- a/net/tipc/core.h +++ b/net/tipc/core.h @@ -122,6 +122,9 @@ struct tipc_net { /* Topology subscription server */ struct tipc_topsrv *topsrv; atomic_t subscription_count; + + /* Cluster capabilities */ + u16 capabilities; }; static inline struct tipc_net *tipc_net(struct net *net) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index a0924956bb61..70ddff2206a0 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct tipc_msg *m, u32 d) msg_set_bits(m, 0, 18, 1, d); } +static inline bool msg_is_rcast(struct tipc_msg *m) +{ + return msg_bits(m, 0, 18, 0x1); +} + +static inline void msg_set_is_rcast(struct tipc_msg *m, bool d) +{ + msg_set_bits(m, 0, 18, 0x1, d); +} + static inline void msg_set_size(struct tipc_msg *m, u32 sz) { m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); diff --git a/net/tipc/node.c b/net/tipc/node.c index db2a6c3e0be9..1386e44d965c 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr, tipc_link_update_caps(l, capabilities); } write_unlock_bh(&n->lock); + /* Calculate cluster capabilities */ + tn->capabilities = TIPC_NODE_CAPABILITIES; + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { + tn->capabilities &= temp_node->capabilities; + } goto exit; } n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net *net, u32 addr, break; } list_add_tail_rcu(&n->list, &temp_node->list); + /* Calculate cluster capabilities */ + tn->capabilities = TIPC_NODE_CAPABILITIES; + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { + tn->capabilities &= temp_node->capabilities; + } trace_tipc_node_create(n, true, " "); exit: spin_unlock_bh(&tn->node_list_lock); diff --git a/net/tipc/node.h b/net/tipc/node.h index 4f59a30e989a..2404225c5d58 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -51,7 +51,8 @@ enum { TIPC_BLOCK_FLOWCTL = (1 << 3), TIPC_BCAST_RCAST = (1 << 4), TIPC_NODE_ID128 = (1 << 5), - TIPC_LINK_PROTO_SEQNO = (1 << 6) + TIPC_LINK_PROTO_SEQNO = (1 << 6), + TIPC_MCAST_RBCTL = (1 << 7) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -60,7 +61,8 @@ enum { TIPC_BCAST_RCAST | \ TIPC_BLOCK_FLOWCTL | \ TIPC_NODE_ID128 | \ - TIPC_LINK_PROTO_SEQNO) + TIPC_LINK_PROTO_SEQNO | \ + TIPC_MCAST_RBCTL) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 8fc5acd4820d..53a8113375a6 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -483,6 +483,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock, tsk_set_unreturnable(tsk, true); if (sock->type == SOCK_DGRAM) tsk_set_unreliable(tsk, true); + __skb_queue_head_init(&tsk->mc_method.deferredq); } trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); @@ -580,6 +581,7 @@ static int tipc_release(struct socket *sock) sk->sk_shutdown = SHUTDOWN_MASK; tipc_sk_leave(tsk); tipc_sk_withdraw(tsk, 0, NULL); + __skb_queue_purge(&tsk->mc_method.deferredq); sk_stop_timer(sk, &sk->sk_timer); tipc_sk_remove(tsk); @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, &tsk->cong_link_cnt); } + /* Broadcast link is now free to choose method for next broadcast */ + if (rc == 0) { + method->mandatory = false; + method->expires = jiffies; + } tipc_nlist_purge(&dsts); return rc ? rc : dlen; @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb, if (unlikely(grp)) tipc_group_filter_msg(grp, &inputq, xmitq); + if (msg_type(hdr) == TIPC_MCAST_MSG) + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, &inputq); + /* Validate and add to receive buffer if there is space */ while ((skb = __skb_dequeue(&inputq))) { hdr = buf_msg(skb); -- 2.17.1 |
From: Jon M. <jon...@er...> - 2019-01-31 21:05:56
|
Hi Hoang, Nice job, but still a few things to improve. See below. > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 29-Jan-19 04:22 > To: Jon Maloy <jon...@er...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: [net-next] tipc: smooth change between replicast and broadcast > > Currently, a multicast stream may start out using replicast, because there are > few destinations, and then it should ideally switch to L2/broadcast > IGMP/multicast when the number of destinations grows beyond a certain > limit. The opposite should happen when the number decreases below the > limit. > > To eliminate the risk of message reordering caused by method change, a > sending socket must stick to a previously selected method until it enters an > idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from > the sender socket. > > In this fix, we allow such a switch between replicast and broadcast without a > 5 seconds pause in the traffic. > > Solution is to send a dummy message with only the header, also with the SYN > bit set, via broadcast/replicast. For the data message, the SYN bit set and > sending via replicast/broadcast. > > Then, at receiving side any messages follow first SYN bit message (data or > dummy), they will be hold in deferred queue until another pair (dummy or > data) arrived. > > For compatibility reasons we have to introduce a new capability flag > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > dummy message sent out, then poll return empty at old machines. > > Signed-off-by: Jon Maloy <jon...@er...> > Signed-off-by: Hoang Le <hoa...@de...> > --- > net/tipc/bcast.c | 116 > +++++++++++++++++++++++++++++++++++++++++++++- > net/tipc/bcast.h | 5 ++ > net/tipc/core.c | 2 + > net/tipc/core.h | 3 ++ > net/tipc/msg.h | 10 ++++ > net/tipc/node.c | 10 ++++ > net/tipc/node.h | 6 ++- > net/tipc/socket.c | 10 ++++ > 8 files changed, 159 insertions(+), 3 deletions(-) > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > d8026543bf4c..e3a85227d4aa 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > sk_buff_head *pkts, > struct tipc_mc_method *method, struct tipc_nlist *dests, > u16 *cong_link_cnt) > { > - struct sk_buff_head inputq, localq; > + struct sk_buff_head inputq, localq, tmpq; > + bool rcast = method->rcast; > + struct sk_buff *skb, *_skb; > + struct tipc_msg *hdr, *_hdr; > int rc = 0; > > skb_queue_head_init(&inputq); > skb_queue_head_init(&localq); > + skb_queue_head_init(&tmpq); > > /* Clone packets before they are consumed by next call */ > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head > *pkts, > /* Send according to determined transmit method */ > if (dests->remote) { > tipc_bcast_select_xmit_method(net, dests->remote, > method); I would suggest that you move the whole code block below into a separate function: msg_set_is_rcast(hdr, method->rcast); if (rcast != method->rcast) tipc_mcast_send_sync(net, skb_peek(pkts)); This function sets the SYN bit in the packet header, then copies that header into a dummy header, inverts the is_rcast bit in that header and sends it out via the appropriate method. Note that this involves a small change: the real message is sent out via the selected method, the dummy message always via the other method, whichever it is. > + > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > + skb = skb_peek(pkts); > + hdr = buf_msg(skb); > + > + if (msg_user(hdr) == MSG_FRAGMENTER) > + hdr = msg_get_wrapped(hdr); > + if (msg_type(hdr) != TIPC_MCAST_MSG) > + goto xmit; > + > + msg_set_syn(hdr, 0); > + msg_set_is_rcast(hdr, method->rcast); > + > + /* switch mode */ > + if (rcast != method->rcast) { > + /* Build message's copied */ > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > + GFP_KERNEL); > + if (!skb) { > + rc = -ENOMEM; > + goto exit; > + } > + skb_orphan(_skb); > + skb_copy_to_linear_data(_skb, hdr, > + MCAST_H_SIZE); > + > + /* Build dummy header */ > + _hdr = buf_msg(_skb); > + msg_set_size(_hdr, MCAST_H_SIZE); > + __skb_queue_tail(&tmpq, _skb); > + > + msg_set_syn(hdr, 1); > + msg_set_syn(_hdr, 1); > + msg_set_is_rcast(_hdr, rcast); > + /* Prepare for 'synching' */ > + if (rcast) > + tipc_rcast_xmit(net, &tmpq, dests, > + cong_link_cnt); > + else > + tipc_bcast_xmit(net, &tmpq, > + cong_link_cnt); > + > + /* This queue should normally be empty by > now */ > + __skb_queue_purge(&tmpq); > + } > + } > +xmit: > if (method->rcast) > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > else > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > nl->remote = 0; > nl->local = false; > } > + > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > + struct sk_buff_head *inputq) > +{ > + struct sk_buff *skb, *_skb; > + struct tipc_msg *hdr, *_hdr; > + u32 node, port, _node, _port; > + bool match = false; If you put in the following lines here we will save some instruction cycles: hdr = buf_msg(skb_peek(inputq)); if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) return; After all this will be the case in the vast majority of cases. It is safe to just peek the queue and access, since inputq never can be empty. > + > + skb = __skb_dequeue(inputq); > + if (!skb) > + return; > + > + hdr = buf_msg(skb); > + node = msg_orignode(hdr); > + port = msg_origport(hdr); > + > + /* Find a peer port if its existing in defer queue */ > + while ((_skb = skb_peek(defq))) { > + _hdr = buf_msg(_skb); > + _node = msg_orignode(_hdr); > + _port = msg_origport(_hdr); > + > + if (_node != node) > + continue; > + if (_port != port) > + continue; > + > + if (!match) { > + if (msg_is_syn(hdr) && > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > + __skb_dequeue(defq); > + if (msg_data_sz(hdr)) { > + __skb_queue_tail(inputq, skb); > + kfree_skb(_skb); > + } else { > + __skb_queue_tail(inputq, _skb); > + kfree_skb(skb); > + } > + match = true; > + } else { > + break; > + } > + } else { > + if (msg_is_syn(_hdr)) > + return; > + /* Dequeued to receive buffer */ > + __skb_dequeue(defq); > + __skb_queue_tail(inputq, _skb); > + } > + } > + > + if (match) > + return; > + > + if (msg_is_syn(hdr)) { > + /* Enqueue and defer to next synching */ > + __skb_queue_tail(defq, skb); > + } else { > + /* Direct enqueued */ > + __skb_queue_tail(inputq, skb); > + } > +} Above function is hard to follow and does not convince me. What if there are messages from many sources, and you find the match in the middle of the queue? I suggest you break down the logics to smaller tasks, e.g., as follows (code compiles ok, but is untested): void tipc_mcast_filter_msg(struct sk_buff_head *defq, struct sk_buff_head *inputq) { struct sk_buff *skb, *_skb, *tmp; struct tipc_msg *hdr, *_hdr; bool match = false; u32 node, port; skb = skb_peek(inputq); hdr = buf_msg(skb); if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) return; __skb_dequeue(inputq); node = msg_orignode(hdr); port = msg_origport(hdr); /* Has the twin SYN message already arrived ? */ skb_queue_walk(defq, _skb) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; match = true; break; } if (!match) return __skb_queue_tail(defq, skb); if (!msg_is_syn(_hdr)) { pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); __skb_queue_purge(defq); return __skb_queue_tail(inputq, skb); } /* Non-SYN message from other link can be delivered right away */ if (!msg_is_syn(hdr)) { if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) return __skb_queue_tail(inputq, skb); else return __skb_queue_tail(defq, skb); } /* Matching SYN messages => return the one with data, if any */ __skb_unlink(_skb, defq); if (msg_data_sz(hdr)) { kfree_skb(_skb); __skb_queue_tail(inputq, skb); } else { kfree_skb(skb); __skb_queue_tail(inputq, _skb); } /* Deliver subsequent non-SYN messages from same peer */ skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; if (msg_is_syn(_hdr)) break; __skb_unlink(_skb, defq); __skb_queue_tail(inputq, _skb); } } > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > 751530ab0c49..165d88a503e4 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node); > /* Cookie to be used between socket and broadcast layer > * @rcast: replicast (instead of broadcast) was used at previous xmit > * @mandatory: broadcast/replicast indication was set by user > + * @deferredq: defer queue to make message in order > * @expires: re-evaluate non-mandatory transmit method if we are past this > */ [...] > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct net > *net, u32 addr, > tipc_link_update_caps(l, capabilities); > } > write_unlock_bh(&n->lock); > + /* Calculate cluster capabilities */ > + tn->capabilities = TIPC_NODE_CAPABILITIES; > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > + tn->capabilities &= temp_node->capabilities; > + } Yes, you are right here. During a cluster upgrade a node can come back with new capabilities which also must be reflected in the cluster capabilities field. Actually, I think it would be a good idea to add cluster capabilities as a separate patch. This makes this rather complex patch slightly smaller. > goto exit; > } > n = kzalloc(sizeof(*n), GFP_ATOMIC); > @@ -433,6 +438,11 @@ static struct tipc_node *tipc_node_create(struct net > *net, u32 addr, > break; [...] > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, struct > tipc_name_seq *seq, > &tsk->cong_link_cnt); > } > > + /* Broadcast link is now free to choose method for next broadcast */ > + if (rc == 0) { > + method->mandatory = false; > + method->expires = jiffies; > + } No, we should leave the socket code as is, so we are sure it works with legacy nodes. We should instead make tipc_bcast_select_input_method() slightly smarter: static void tipc_bcast_select_xmit_method(struct net *net, int dests, struct tipc_mc_method *method) { ....... /* Can current method be changed ? */ method->expires = jiffies + TIPC_METHOD_EXPIRE; if (method->mandatory) return; if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && time_before(jiffies, exp)) return; /* Determine method to use now */ method->rcast = dests <= bb->bc_threshold; } I.e., we respect the 'mandatory' setting, because we need that for group_cast wo work correctly, but we override 'method->expire' if the cluster capabilities says that all nodes support MCAST_RBCTL. Combined with the patch where we add forced BCAST or REPLICAST (make sure you add this patch on top of that one) I think we have a achieved a pretty smart and adaptive multicast subsystem. BR ///jon > tipc_nlist_purge(&dsts); > > return rc ? rc : dlen; > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct > sk_buff *skb, > if (unlikely(grp)) > tipc_group_filter_msg(grp, &inputq, xmitq); > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > &inputq); > + > /* Validate and add to receive buffer if there is space */ > while ((skb = __skb_dequeue(&inputq))) { > hdr = buf_msg(skb); > -- > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-02-01 00:10:38
|
Hi Huang, The (!match) case of my suggested code was wrong. To possibly save you some unnecessary troubleshooting I fixed that and made some other simplifications that became possible. ///jon -----Original Message----- From: Jon Maloy Sent: 31-Jan-19 16:05 To: 'Hoang Le' <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li... Subject: RE: [net-next] tipc: smooth change between replicast and broadcast Hi Hoang, Nice job, but still a few things to improve. See below. > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 29-Jan-19 04:22 > To: Jon Maloy <jon...@er...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: [net-next] tipc: smooth change between replicast and [...] void tipc_mcast_filter_msg(struct sk_buff_head *defq, struct sk_buff_head *inputq) { struct sk_buff *skb, *_skb, *tmp; struct tipc_msg *hdr, *_hdr; bool match = false; u32 node, port; skb = skb_peek(inputq); hdr = buf_msg(skb); if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) return; node = msg_orignode(hdr); port = msg_origport(hdr); /* Has the twin SYN message already arrived ? */ skb_queue_walk(defq, _skb) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; match = true; break; } if (!match) { if (!msg_is_syn(hdr)) return; __skb_dequeue(inputq); __skb_queue_tail(defq, skb); return; } if (!msg_is_syn(_hdr)) { pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); __skb_queue_purge(defq); return; } /* Deliver non-SYN message from other link, otherewise queue it */ if (!msg_is_syn(hdr)) { if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) return; __skb_dequeue(inputq); __skb_queue_tail(defq, skb); return; } /* Matching SYN messages => return the one with data, if any */ __skb_unlink(_skb, defq); if (msg_data_sz(hdr)) { kfree_skb(_skb); } else { __skb_dequeue(inputq); kfree_skb(skb); __skb_queue_tail(inputq, _skb); } /* Deliver subsequent non-SYN messages from same peer */ skb_queue_walk_safe(defq, _skb, tmp) { _hdr = buf_msg(_skb); if (msg_orignode(_hdr) != node) continue; if (msg_origport(_hdr) != port) continue; if (msg_is_syn(_hdr)) break; __skb_unlink(_skb, defq); __skb_queue_tail(inputq, _skb); } } > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > 751530ab0c49..165d88a503e4 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > node); > /* Cookie to be used between socket and broadcast layer > * @rcast: replicast (instead of broadcast) was used at previous xmit > * @mandatory: broadcast/replicast indication was set by user > + * @deferredq: defer queue to make message in order > * @expires: re-evaluate non-mandatory transmit method if we are past this > */ [...] > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > net *net, u32 addr, > tipc_link_update_caps(l, capabilities); > } > write_unlock_bh(&n->lock); > + /* Calculate cluster capabilities */ > + tn->capabilities = TIPC_NODE_CAPABILITIES; > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > + tn->capabilities &= temp_node->capabilities; > + } Yes, you are right here. During a cluster upgrade a node can come back with new capabilities which also must be reflected in the cluster capabilities field. Actually, I think it would be a good idea to add cluster capabilities as a separate patch. This makes this rather complex patch slightly smaller. > goto exit; > } > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > break; [...] > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > struct tipc_name_seq *seq, > &tsk->cong_link_cnt); > } > > + /* Broadcast link is now free to choose method for next broadcast */ > + if (rc == 0) { > + method->mandatory = false; > + method->expires = jiffies; > + } No, we should leave the socket code as is, so we are sure it works with legacy nodes. We should instead make tipc_bcast_select_input_method() slightly smarter: static void tipc_bcast_select_xmit_method(struct net *net, int dests, struct tipc_mc_method *method) { ....... /* Can current method be changed ? */ method->expires = jiffies + TIPC_METHOD_EXPIRE; if (method->mandatory) return; if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && time_before(jiffies, exp)) return; /* Determine method to use now */ method->rcast = dests <= bb->bc_threshold; } I.e., we respect the 'mandatory' setting, because we need that for group_cast wo work correctly, but we override 'method->expire' if the cluster capabilities says that all nodes support MCAST_RBCTL. Combined with the patch where we add forced BCAST or REPLICAST (make sure you add this patch on top of that one) I think we have a achieved a pretty smart and adaptive multicast subsystem. BR ///jon > tipc_nlist_purge(&dsts); > > return rc ? rc : dlen; > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > struct sk_buff *skb, > if (unlikely(grp)) > tipc_group_filter_msg(grp, &inputq, xmitq); > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > &inputq); > + > /* Validate and add to receive buffer if there is space */ > while ((skb = __skb_dequeue(&inputq))) { > hdr = buf_msg(skb); > -- > 2.17.1 |
From: Jon M. <jon...@er...> - 2019-02-01 12:39:01
|
Hi Hoang, I realized one more thing. Since this mechanism only is intended for MCAST messages, we must filter out those at the send side (in tipc_mcast_send_sync()) so that SYN messages are sent only for those. Group messaging, which has its own mechanism, does not need this new method, and cannot discover message duplicates, -it would just deliver them. This must be avoided. BR ///jon > -----Original Message----- > From: Jon Maloy > Sent: 31-Jan-19 16:05 > To: 'Hoang Le' <hoa...@de...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: RE: [net-next] tipc: smooth change between replicast and broadcast > > Hi Hoang, > Nice job, but still a few things to improve. See below. > > > -----Original Message----- > > From: Hoang Le <hoa...@de...> > > Sent: 29-Jan-19 04:22 > > To: Jon Maloy <jon...@er...>; ma...@do...; > > yin...@wi...; tip...@li... > > Subject: [net-next] tipc: smooth change between replicast and > > broadcast > > > > Currently, a multicast stream may start out using replicast, because > > there are few destinations, and then it should ideally switch to > > L2/broadcast IGMP/multicast when the number of destinations grows > > beyond a certain limit. The opposite should happen when the number > > decreases below the limit. > > > > To eliminate the risk of message reordering caused by method change, a > > sending socket must stick to a previously selected method until it > > enters an idle period of 5 seconds. Means there is a 5 seconds pause > > in the traffic from the sender socket. > > > > In this fix, we allow such a switch between replicast and broadcast > > without a > > 5 seconds pause in the traffic. > > > > Solution is to send a dummy message with only the header, also with > > the SYN bit set, via broadcast/replicast. For the data message, the > > SYN bit set and sending via replicast/broadcast. > > > > Then, at receiving side any messages follow first SYN bit message > > (data or dummy), they will be hold in deferred queue until another > > pair (dummy or > > data) arrived. > > > > For compatibility reasons we have to introduce a new capability flag > > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > > dummy message sent out, then poll return empty at old machines. > > > > Signed-off-by: Jon Maloy <jon...@er...> > > Signed-off-by: Hoang Le <hoa...@de...> > > --- > > net/tipc/bcast.c | 116 > > +++++++++++++++++++++++++++++++++++++++++++++- > > net/tipc/bcast.h | 5 ++ > > net/tipc/core.c | 2 + > > net/tipc/core.h | 3 ++ > > net/tipc/msg.h | 10 ++++ > > net/tipc/node.c | 10 ++++ > > net/tipc/node.h | 6 ++- > > net/tipc/socket.c | 10 ++++ > > 8 files changed, 159 insertions(+), 3 deletions(-) > > > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > > d8026543bf4c..e3a85227d4aa 100644 > > --- a/net/tipc/bcast.c > > +++ b/net/tipc/bcast.c > > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > struct tipc_mc_method *method, struct tipc_nlist *dests, > > u16 *cong_link_cnt) > > { > > - struct sk_buff_head inputq, localq; > > + struct sk_buff_head inputq, localq, tmpq; > > + bool rcast = method->rcast; > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > int rc = 0; > > > > skb_queue_head_init(&inputq); > > skb_queue_head_init(&localq); > > + skb_queue_head_init(&tmpq); > > > > /* Clone packets before they are consumed by next call */ > > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > /* Send according to determined transmit method */ > > if (dests->remote) { > > tipc_bcast_select_xmit_method(net, dests->remote, > method); > > I would suggest that you move the whole code block below into a separate > function: > msg_set_is_rcast(hdr, method->rcast); > if (rcast != method->rcast) > tipc_mcast_send_sync(net, skb_peek(pkts)); > > This function sets the SYN bit in the packet header, then copies that header > into a dummy header, inverts the is_rcast bit in that header and sends it out > via the appropriate method. > Note that this involves a small change: the real message is sent out via the > selected method, the dummy message always via the other method, > whichever it is. > > > + > > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > > + skb = skb_peek(pkts); > > + hdr = buf_msg(skb); > > + > > + if (msg_user(hdr) == MSG_FRAGMENTER) > > + hdr = msg_get_wrapped(hdr); > > + if (msg_type(hdr) != TIPC_MCAST_MSG) > > + goto xmit; > > + > > + msg_set_syn(hdr, 0); > > + msg_set_is_rcast(hdr, method->rcast); > > + > > + /* switch mode */ > > + if (rcast != method->rcast) { > > + /* Build message's copied */ > > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > > + GFP_KERNEL); > > + if (!skb) { > > + rc = -ENOMEM; > > + goto exit; > > + } > > + skb_orphan(_skb); > > + skb_copy_to_linear_data(_skb, hdr, > > + MCAST_H_SIZE); > > + > > + /* Build dummy header */ > > + _hdr = buf_msg(_skb); > > + msg_set_size(_hdr, MCAST_H_SIZE); > > + __skb_queue_tail(&tmpq, _skb); > > + > > + msg_set_syn(hdr, 1); > > + msg_set_syn(_hdr, 1); > > + msg_set_is_rcast(_hdr, rcast); > > + /* Prepare for 'synching' */ > > + if (rcast) > > + tipc_rcast_xmit(net, &tmpq, dests, > > + cong_link_cnt); > > + else > > + tipc_bcast_xmit(net, &tmpq, > > + cong_link_cnt); > > + > > + /* This queue should normally be empty by > > now */ > > + __skb_queue_purge(&tmpq); > > + } > > + } > > +xmit: > > if (method->rcast) > > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > > else > > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > > nl->remote = 0; > > nl->local = false; > > } > > + > > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > > + struct sk_buff_head *inputq) > > +{ > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > + u32 node, port, _node, _port; > > + bool match = false; > > If you put in the following lines here we will save some instruction cycles: > hdr = buf_msg(skb_peek(inputq)); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > After all this will be the case in the vast majority of cases. It is safe to just > peek the queue and access, since inputq never can be empty. > > > + > > + skb = __skb_dequeue(inputq); > > + if (!skb) > > + return; > > + > > + hdr = buf_msg(skb); > > + node = msg_orignode(hdr); > > + port = msg_origport(hdr); > > + > > + /* Find a peer port if its existing in defer queue */ > > + while ((_skb = skb_peek(defq))) { > > + _hdr = buf_msg(_skb); > > + _node = msg_orignode(_hdr); > > + _port = msg_origport(_hdr); > > + > > + if (_node != node) > > + continue; > > + if (_port != port) > > + continue; > > + > > + if (!match) { > > + if (msg_is_syn(hdr) && > > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > > + __skb_dequeue(defq); > > + if (msg_data_sz(hdr)) { > > + __skb_queue_tail(inputq, skb); > > + kfree_skb(_skb); > > + } else { > > + __skb_queue_tail(inputq, _skb); > > + kfree_skb(skb); > > + } > > + match = true; > > + } else { > > + break; > > + } > > + } else { > > + if (msg_is_syn(_hdr)) > > + return; > > + /* Dequeued to receive buffer */ > > + __skb_dequeue(defq); > > + __skb_queue_tail(inputq, _skb); > > + } > > + } > > + > > + if (match) > > + return; > > + > > + if (msg_is_syn(hdr)) { > > + /* Enqueue and defer to next synching */ > > + __skb_queue_tail(defq, skb); > > + } else { > > + /* Direct enqueued */ > > + __skb_queue_tail(inputq, skb); > > + } > > +} > > Above function is hard to follow and does not convince me. What if there are > messages from many sources, and you find the match in the middle of the > queue? > I suggest you break down the logics to smaller tasks, e.g., as follows (code > compiles ok, but is untested): > > void tipc_mcast_filter_msg(struct sk_buff_head *defq, > struct sk_buff_head *inputq) { > struct sk_buff *skb, *_skb, *tmp; > struct tipc_msg *hdr, *_hdr; > bool match = false; > u32 node, port; > > skb = skb_peek(inputq); > hdr = buf_msg(skb); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > __skb_dequeue(inputq); > node = msg_orignode(hdr); > port = msg_origport(hdr); > > /* Has the twin SYN message already arrived ? */ > skb_queue_walk(defq, _skb) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > match = true; > break; > } > > if (!match) > return __skb_queue_tail(defq, skb); > > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } > > /* Non-SYN message from other link can be delivered right away */ > if (!msg_is_syn(hdr)) { > if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) > return __skb_queue_tail(inputq, skb); > else > return __skb_queue_tail(defq, skb); > } > > /* Matching SYN messages => return the one with data, if any */ > __skb_unlink(_skb, defq); > if (msg_data_sz(hdr)) { > kfree_skb(_skb); > __skb_queue_tail(inputq, skb); > } else { > kfree_skb(skb); > __skb_queue_tail(inputq, _skb); > } > /* Deliver subsequent non-SYN messages from same peer */ > skb_queue_walk_safe(defq, _skb, tmp) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > if (msg_is_syn(_hdr)) > break; > __skb_unlink(_skb, defq); > __skb_queue_tail(inputq, _skb); > } > } > > > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > > 751530ab0c49..165d88a503e4 100644 > > --- a/net/tipc/bcast.h > > +++ b/net/tipc/bcast.h > > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > > node); > > /* Cookie to be used between socket and broadcast layer > > * @rcast: replicast (instead of broadcast) was used at previous xmit > > * @mandatory: broadcast/replicast indication was set by user > > + * @deferredq: defer queue to make message in order > > * @expires: re-evaluate non-mandatory transmit method if we are past > this > > */ > > [...] > > > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > > net *net, u32 addr, > > tipc_link_update_caps(l, capabilities); > > } > > write_unlock_bh(&n->lock); > > + /* Calculate cluster capabilities */ > > + tn->capabilities = TIPC_NODE_CAPABILITIES; > > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > > + tn->capabilities &= temp_node->capabilities; > > + } > > Yes, you are right here. During a cluster upgrade a node can come back with > new capabilities which also must be reflected in the cluster capabilities field. > Actually, I think it would be a good idea to add cluster capabilities as a > separate patch. This makes this rather complex patch slightly smaller. > > > goto exit; > > } > > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > > break; > > [...] > > > > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > > struct tipc_name_seq *seq, > > &tsk->cong_link_cnt); > > } > > > > + /* Broadcast link is now free to choose method for next broadcast */ > > + if (rc == 0) { > > + method->mandatory = false; > > + method->expires = jiffies; > > + } > > No, we should leave the socket code as is, so we are sure it works with legacy > nodes. > We should instead make tipc_bcast_select_input_method() slightly smarter: > > static void tipc_bcast_select_xmit_method(struct net *net, int dests, > struct tipc_mc_method *method) { > ....... > /* Can current method be changed ? */ > method->expires = jiffies + TIPC_METHOD_EXPIRE; > if (method->mandatory) > return; > if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && > time_before(jiffies, exp)) > return; > > /* Determine method to use now */ > method->rcast = dests <= bb->bc_threshold; } > > I.e., we respect the 'mandatory' setting, because we need that for > group_cast wo work correctly, but we override 'method->expire' if the > cluster capabilities says that all nodes support MCAST_RBCTL. > Combined with the patch where we add forced BCAST or REPLICAST (make > sure you add this patch on top of that one) I think we have a achieved a > pretty smart and adaptive multicast subsystem. > > BR > ///jon > > > > tipc_nlist_purge(&dsts); > > > > return rc ? rc : dlen; > > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > > struct sk_buff *skb, > > if (unlikely(grp)) > > tipc_group_filter_msg(grp, &inputq, xmitq); > > > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > > &inputq); > > + > > /* Validate and add to receive buffer if there is space */ > > while ((skb = __skb_dequeue(&inputq))) { > > hdr = buf_msg(skb); > > -- > > 2.17.1 |
From: Hoang L. <hoa...@de...> - 2019-02-14 09:09:08
|
Hi Jon, Do we need purge cascade in this case? This happen when having a packet dropped from A source/peer. Then, filter out only that source/peer instead of __skb_queue_purge(defq) is preferable. [...] > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } Thanks and regards, Hoang -----Original Message----- From: Jon Maloy <jon...@er...> Sent: Friday, February 1, 2019 7:39 PM To: Hoang Huu Le <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li... Subject: RE: [net-next] tipc: smooth change between replicast and broadcast Hi Hoang, I realized one more thing. Since this mechanism only is intended for MCAST messages, we must filter out those at the send side (in tipc_mcast_send_sync()) so that SYN messages are sent only for those. Group messaging, which has its own mechanism, does not need this new method, and cannot discover message duplicates, -it would just deliver them. This must be avoided. BR ///jon > -----Original Message----- > From: Jon Maloy > Sent: 31-Jan-19 16:05 > To: 'Hoang Le' <hoa...@de...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: RE: [net-next] tipc: smooth change between replicast and broadcast > > Hi Hoang, > Nice job, but still a few things to improve. See below. > > > -----Original Message----- > > From: Hoang Le <hoa...@de...> > > Sent: 29-Jan-19 04:22 > > To: Jon Maloy <jon...@er...>; ma...@do...; > > yin...@wi...; tip...@li... > > Subject: [net-next] tipc: smooth change between replicast and > > broadcast > > > > Currently, a multicast stream may start out using replicast, because > > there are few destinations, and then it should ideally switch to > > L2/broadcast IGMP/multicast when the number of destinations grows > > beyond a certain limit. The opposite should happen when the number > > decreases below the limit. > > > > To eliminate the risk of message reordering caused by method change, a > > sending socket must stick to a previously selected method until it > > enters an idle period of 5 seconds. Means there is a 5 seconds pause > > in the traffic from the sender socket. > > > > In this fix, we allow such a switch between replicast and broadcast > > without a > > 5 seconds pause in the traffic. > > > > Solution is to send a dummy message with only the header, also with > > the SYN bit set, via broadcast/replicast. For the data message, the > > SYN bit set and sending via replicast/broadcast. > > > > Then, at receiving side any messages follow first SYN bit message > > (data or dummy), they will be hold in deferred queue until another > > pair (dummy or > > data) arrived. > > > > For compatibility reasons we have to introduce a new capability flag > > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > > dummy message sent out, then poll return empty at old machines. > > > > Signed-off-by: Jon Maloy <jon...@er...> > > Signed-off-by: Hoang Le <hoa...@de...> > > --- > > net/tipc/bcast.c | 116 > > +++++++++++++++++++++++++++++++++++++++++++++- > > net/tipc/bcast.h | 5 ++ > > net/tipc/core.c | 2 + > > net/tipc/core.h | 3 ++ > > net/tipc/msg.h | 10 ++++ > > net/tipc/node.c | 10 ++++ > > net/tipc/node.h | 6 ++- > > net/tipc/socket.c | 10 ++++ > > 8 files changed, 159 insertions(+), 3 deletions(-) > > > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > > d8026543bf4c..e3a85227d4aa 100644 > > --- a/net/tipc/bcast.c > > +++ b/net/tipc/bcast.c > > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > struct tipc_mc_method *method, struct tipc_nlist *dests, > > u16 *cong_link_cnt) > > { > > - struct sk_buff_head inputq, localq; > > + struct sk_buff_head inputq, localq, tmpq; > > + bool rcast = method->rcast; > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > int rc = 0; > > > > skb_queue_head_init(&inputq); > > skb_queue_head_init(&localq); > > + skb_queue_head_init(&tmpq); > > > > /* Clone packets before they are consumed by next call */ > > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > /* Send according to determined transmit method */ > > if (dests->remote) { > > tipc_bcast_select_xmit_method(net, dests->remote, > method); > > I would suggest that you move the whole code block below into a separate > function: > msg_set_is_rcast(hdr, method->rcast); > if (rcast != method->rcast) > tipc_mcast_send_sync(net, skb_peek(pkts)); > > This function sets the SYN bit in the packet header, then copies that header > into a dummy header, inverts the is_rcast bit in that header and sends it out > via the appropriate method. > Note that this involves a small change: the real message is sent out via the > selected method, the dummy message always via the other method, > whichever it is. > > > + > > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > > + skb = skb_peek(pkts); > > + hdr = buf_msg(skb); > > + > > + if (msg_user(hdr) == MSG_FRAGMENTER) > > + hdr = msg_get_wrapped(hdr); > > + if (msg_type(hdr) != TIPC_MCAST_MSG) > > + goto xmit; > > + > > + msg_set_syn(hdr, 0); > > + msg_set_is_rcast(hdr, method->rcast); > > + > > + /* switch mode */ > > + if (rcast != method->rcast) { > > + /* Build message's copied */ > > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > > + GFP_KERNEL); > > + if (!skb) { > > + rc = -ENOMEM; > > + goto exit; > > + } > > + skb_orphan(_skb); > > + skb_copy_to_linear_data(_skb, hdr, > > + MCAST_H_SIZE); > > + > > + /* Build dummy header */ > > + _hdr = buf_msg(_skb); > > + msg_set_size(_hdr, MCAST_H_SIZE); > > + __skb_queue_tail(&tmpq, _skb); > > + > > + msg_set_syn(hdr, 1); > > + msg_set_syn(_hdr, 1); > > + msg_set_is_rcast(_hdr, rcast); > > + /* Prepare for 'synching' */ > > + if (rcast) > > + tipc_rcast_xmit(net, &tmpq, dests, > > + cong_link_cnt); > > + else > > + tipc_bcast_xmit(net, &tmpq, > > + cong_link_cnt); > > + > > + /* This queue should normally be empty by > > now */ > > + __skb_queue_purge(&tmpq); > > + } > > + } > > +xmit: > > if (method->rcast) > > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > > else > > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > > nl->remote = 0; > > nl->local = false; > > } > > + > > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > > + struct sk_buff_head *inputq) > > +{ > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > + u32 node, port, _node, _port; > > + bool match = false; > > If you put in the following lines here we will save some instruction cycles: > hdr = buf_msg(skb_peek(inputq)); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > After all this will be the case in the vast majority of cases. It is safe to just > peek the queue and access, since inputq never can be empty. > > > + > > + skb = __skb_dequeue(inputq); > > + if (!skb) > > + return; > > + > > + hdr = buf_msg(skb); > > + node = msg_orignode(hdr); > > + port = msg_origport(hdr); > > + > > + /* Find a peer port if its existing in defer queue */ > > + while ((_skb = skb_peek(defq))) { > > + _hdr = buf_msg(_skb); > > + _node = msg_orignode(_hdr); > > + _port = msg_origport(_hdr); > > + > > + if (_node != node) > > + continue; > > + if (_port != port) > > + continue; > > + > > + if (!match) { > > + if (msg_is_syn(hdr) && > > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > > + __skb_dequeue(defq); > > + if (msg_data_sz(hdr)) { > > + __skb_queue_tail(inputq, skb); > > + kfree_skb(_skb); > > + } else { > > + __skb_queue_tail(inputq, _skb); > > + kfree_skb(skb); > > + } > > + match = true; > > + } else { > > + break; > > + } > > + } else { > > + if (msg_is_syn(_hdr)) > > + return; > > + /* Dequeued to receive buffer */ > > + __skb_dequeue(defq); > > + __skb_queue_tail(inputq, _skb); > > + } > > + } > > + > > + if (match) > > + return; > > + > > + if (msg_is_syn(hdr)) { > > + /* Enqueue and defer to next synching */ > > + __skb_queue_tail(defq, skb); > > + } else { > > + /* Direct enqueued */ > > + __skb_queue_tail(inputq, skb); > > + } > > +} > > Above function is hard to follow and does not convince me. What if there are > messages from many sources, and you find the match in the middle of the > queue? > I suggest you break down the logics to smaller tasks, e.g., as follows (code > compiles ok, but is untested): > > void tipc_mcast_filter_msg(struct sk_buff_head *defq, > struct sk_buff_head *inputq) { > struct sk_buff *skb, *_skb, *tmp; > struct tipc_msg *hdr, *_hdr; > bool match = false; > u32 node, port; > > skb = skb_peek(inputq); > hdr = buf_msg(skb); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > __skb_dequeue(inputq); > node = msg_orignode(hdr); > port = msg_origport(hdr); > > /* Has the twin SYN message already arrived ? */ > skb_queue_walk(defq, _skb) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > match = true; > break; > } > > if (!match) > return __skb_queue_tail(defq, skb); > > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } > > /* Non-SYN message from other link can be delivered right away */ > if (!msg_is_syn(hdr)) { > if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) > return __skb_queue_tail(inputq, skb); > else > return __skb_queue_tail(defq, skb); > } > > /* Matching SYN messages => return the one with data, if any */ > __skb_unlink(_skb, defq); > if (msg_data_sz(hdr)) { > kfree_skb(_skb); > __skb_queue_tail(inputq, skb); > } else { > kfree_skb(skb); > __skb_queue_tail(inputq, _skb); > } > /* Deliver subsequent non-SYN messages from same peer */ > skb_queue_walk_safe(defq, _skb, tmp) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > if (msg_is_syn(_hdr)) > break; > __skb_unlink(_skb, defq); > __skb_queue_tail(inputq, _skb); > } > } > > > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > > 751530ab0c49..165d88a503e4 100644 > > --- a/net/tipc/bcast.h > > +++ b/net/tipc/bcast.h > > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > > node); > > /* Cookie to be used between socket and broadcast layer > > * @rcast: replicast (instead of broadcast) was used at previous xmit > > * @mandatory: broadcast/replicast indication was set by user > > + * @deferredq: defer queue to make message in order > > * @expires: re-evaluate non-mandatory transmit method if we are past > this > > */ > > [...] > > > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > > net *net, u32 addr, > > tipc_link_update_caps(l, capabilities); > > } > > write_unlock_bh(&n->lock); > > + /* Calculate cluster capabilities */ > > + tn->capabilities = TIPC_NODE_CAPABILITIES; > > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > > + tn->capabilities &= temp_node->capabilities; > > + } > > Yes, you are right here. During a cluster upgrade a node can come back with > new capabilities which also must be reflected in the cluster capabilities field. > Actually, I think it would be a good idea to add cluster capabilities as a > separate patch. This makes this rather complex patch slightly smaller. > > > goto exit; > > } > > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > > break; > > [...] > > > > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > > struct tipc_name_seq *seq, > > &tsk->cong_link_cnt); > > } > > > > + /* Broadcast link is now free to choose method for next broadcast */ > > + if (rc == 0) { > > + method->mandatory = false; > > + method->expires = jiffies; > > + } > > No, we should leave the socket code as is, so we are sure it works with legacy > nodes. > We should instead make tipc_bcast_select_input_method() slightly smarter: > > static void tipc_bcast_select_xmit_method(struct net *net, int dests, > struct tipc_mc_method *method) { > ....... > /* Can current method be changed ? */ > method->expires = jiffies + TIPC_METHOD_EXPIRE; > if (method->mandatory) > return; > if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && > time_before(jiffies, exp)) > return; > > /* Determine method to use now */ > method->rcast = dests <= bb->bc_threshold; } > > I.e., we respect the 'mandatory' setting, because we need that for > group_cast wo work correctly, but we override 'method->expire' if the > cluster capabilities says that all nodes support MCAST_RBCTL. > Combined with the patch where we add forced BCAST or REPLICAST (make > sure you add this patch on top of that one) I think we have a achieved a > pretty smart and adaptive multicast subsystem. > > BR > ///jon > > > > tipc_nlist_purge(&dsts); > > > > return rc ? rc : dlen; > > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > > struct sk_buff *skb, > > if (unlikely(grp)) > > tipc_group_filter_msg(grp, &inputq, xmitq); > > > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > > &inputq); > > + > > /* Validate and add to receive buffer if there is space */ > > while ((skb = __skb_dequeue(&inputq))) { > > hdr = buf_msg(skb); > > -- > > 2.17.1 |
From: Hoang L. <hoa...@de...> - 2019-02-15 04:51:42
|
Hi Jon, I realized below would never happen in real and become redundancy. I will remove these lines of code as well. Regards, Hoang -----Original Message----- From: Hoang Le <hoa...@de...> Sent: Thursday, February 14, 2019 4:09 PM To: 'Jon Maloy' <jon...@er...>; ma...@do...; yin...@wi...; tip...@li... Subject: Re: [tipc-discussion] [net-next] tipc: smooth change between replicast and broadcast Hi Jon, Do we need purge cascade in this case? This happen when having a packet dropped from A source/peer. Then, filter out only that source/peer instead of __skb_queue_purge(defq) is preferable. [...] > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } Thanks and regards, Hoang -----Original Message----- From: Jon Maloy <jon...@er...> Sent: Friday, February 1, 2019 7:39 PM To: Hoang Huu Le <hoa...@de...>; ma...@do...; yin...@wi...; tip...@li... Subject: RE: [net-next] tipc: smooth change between replicast and broadcast Hi Hoang, I realized one more thing. Since this mechanism only is intended for MCAST messages, we must filter out those at the send side (in tipc_mcast_send_sync()) so that SYN messages are sent only for those. Group messaging, which has its own mechanism, does not need this new method, and cannot discover message duplicates, -it would just deliver them. This must be avoided. BR ///jon > -----Original Message----- > From: Jon Maloy > Sent: 31-Jan-19 16:05 > To: 'Hoang Le' <hoa...@de...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: RE: [net-next] tipc: smooth change between replicast and broadcast > > Hi Hoang, > Nice job, but still a few things to improve. See below. > > > -----Original Message----- > > From: Hoang Le <hoa...@de...> > > Sent: 29-Jan-19 04:22 > > To: Jon Maloy <jon...@er...>; ma...@do...; > > yin...@wi...; tip...@li... > > Subject: [net-next] tipc: smooth change between replicast and > > broadcast > > > > Currently, a multicast stream may start out using replicast, because > > there are few destinations, and then it should ideally switch to > > L2/broadcast IGMP/multicast when the number of destinations grows > > beyond a certain limit. The opposite should happen when the number > > decreases below the limit. > > > > To eliminate the risk of message reordering caused by method change, a > > sending socket must stick to a previously selected method until it > > enters an idle period of 5 seconds. Means there is a 5 seconds pause > > in the traffic from the sender socket. > > > > In this fix, we allow such a switch between replicast and broadcast > > without a > > 5 seconds pause in the traffic. > > > > Solution is to send a dummy message with only the header, also with > > the SYN bit set, via broadcast/replicast. For the data message, the > > SYN bit set and sending via replicast/broadcast. > > > > Then, at receiving side any messages follow first SYN bit message > > (data or dummy), they will be hold in deferred queue until another > > pair (dummy or > > data) arrived. > > > > For compatibility reasons we have to introduce a new capability flag > > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > > dummy message sent out, then poll return empty at old machines. > > > > Signed-off-by: Jon Maloy <jon...@er...> > > Signed-off-by: Hoang Le <hoa...@de...> > > --- > > net/tipc/bcast.c | 116 > > +++++++++++++++++++++++++++++++++++++++++++++- > > net/tipc/bcast.h | 5 ++ > > net/tipc/core.c | 2 + > > net/tipc/core.h | 3 ++ > > net/tipc/msg.h | 10 ++++ > > net/tipc/node.c | 10 ++++ > > net/tipc/node.h | 6 ++- > > net/tipc/socket.c | 10 ++++ > > 8 files changed, 159 insertions(+), 3 deletions(-) > > > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > > d8026543bf4c..e3a85227d4aa 100644 > > --- a/net/tipc/bcast.c > > +++ b/net/tipc/bcast.c > > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > struct tipc_mc_method *method, struct tipc_nlist *dests, > > u16 *cong_link_cnt) > > { > > - struct sk_buff_head inputq, localq; > > + struct sk_buff_head inputq, localq, tmpq; > > + bool rcast = method->rcast; > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > int rc = 0; > > > > skb_queue_head_init(&inputq); > > skb_queue_head_init(&localq); > > + skb_queue_head_init(&tmpq); > > > > /* Clone packets before they are consumed by next call */ > > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > /* Send according to determined transmit method */ > > if (dests->remote) { > > tipc_bcast_select_xmit_method(net, dests->remote, > method); > > I would suggest that you move the whole code block below into a separate > function: > msg_set_is_rcast(hdr, method->rcast); > if (rcast != method->rcast) > tipc_mcast_send_sync(net, skb_peek(pkts)); > > This function sets the SYN bit in the packet header, then copies that header > into a dummy header, inverts the is_rcast bit in that header and sends it out > via the appropriate method. > Note that this involves a small change: the real message is sent out via the > selected method, the dummy message always via the other method, > whichever it is. > > > + > > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > > + skb = skb_peek(pkts); > > + hdr = buf_msg(skb); > > + > > + if (msg_user(hdr) == MSG_FRAGMENTER) > > + hdr = msg_get_wrapped(hdr); > > + if (msg_type(hdr) != TIPC_MCAST_MSG) > > + goto xmit; > > + > > + msg_set_syn(hdr, 0); > > + msg_set_is_rcast(hdr, method->rcast); > > + > > + /* switch mode */ > > + if (rcast != method->rcast) { > > + /* Build message's copied */ > > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > > + GFP_KERNEL); > > + if (!skb) { > > + rc = -ENOMEM; > > + goto exit; > > + } > > + skb_orphan(_skb); > > + skb_copy_to_linear_data(_skb, hdr, > > + MCAST_H_SIZE); > > + > > + /* Build dummy header */ > > + _hdr = buf_msg(_skb); > > + msg_set_size(_hdr, MCAST_H_SIZE); > > + __skb_queue_tail(&tmpq, _skb); > > + > > + msg_set_syn(hdr, 1); > > + msg_set_syn(_hdr, 1); > > + msg_set_is_rcast(_hdr, rcast); > > + /* Prepare for 'synching' */ > > + if (rcast) > > + tipc_rcast_xmit(net, &tmpq, dests, > > + cong_link_cnt); > > + else > > + tipc_bcast_xmit(net, &tmpq, > > + cong_link_cnt); > > + > > + /* This queue should normally be empty by > > now */ > > + __skb_queue_purge(&tmpq); > > + } > > + } > > +xmit: > > if (method->rcast) > > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > > else > > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > > nl->remote = 0; > > nl->local = false; > > } > > + > > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > > + struct sk_buff_head *inputq) > > +{ > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > + u32 node, port, _node, _port; > > + bool match = false; > > If you put in the following lines here we will save some instruction cycles: > hdr = buf_msg(skb_peek(inputq)); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > After all this will be the case in the vast majority of cases. It is safe to just > peek the queue and access, since inputq never can be empty. > > > + > > + skb = __skb_dequeue(inputq); > > + if (!skb) > > + return; > > + > > + hdr = buf_msg(skb); > > + node = msg_orignode(hdr); > > + port = msg_origport(hdr); > > + > > + /* Find a peer port if its existing in defer queue */ > > + while ((_skb = skb_peek(defq))) { > > + _hdr = buf_msg(_skb); > > + _node = msg_orignode(_hdr); > > + _port = msg_origport(_hdr); > > + > > + if (_node != node) > > + continue; > > + if (_port != port) > > + continue; > > + > > + if (!match) { > > + if (msg_is_syn(hdr) && > > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > > + __skb_dequeue(defq); > > + if (msg_data_sz(hdr)) { > > + __skb_queue_tail(inputq, skb); > > + kfree_skb(_skb); > > + } else { > > + __skb_queue_tail(inputq, _skb); > > + kfree_skb(skb); > > + } > > + match = true; > > + } else { > > + break; > > + } > > + } else { > > + if (msg_is_syn(_hdr)) > > + return; > > + /* Dequeued to receive buffer */ > > + __skb_dequeue(defq); > > + __skb_queue_tail(inputq, _skb); > > + } > > + } > > + > > + if (match) > > + return; > > + > > + if (msg_is_syn(hdr)) { > > + /* Enqueue and defer to next synching */ > > + __skb_queue_tail(defq, skb); > > + } else { > > + /* Direct enqueued */ > > + __skb_queue_tail(inputq, skb); > > + } > > +} > > Above function is hard to follow and does not convince me. What if there are > messages from many sources, and you find the match in the middle of the > queue? > I suggest you break down the logics to smaller tasks, e.g., as follows (code > compiles ok, but is untested): > > void tipc_mcast_filter_msg(struct sk_buff_head *defq, > struct sk_buff_head *inputq) { > struct sk_buff *skb, *_skb, *tmp; > struct tipc_msg *hdr, *_hdr; > bool match = false; > u32 node, port; > > skb = skb_peek(inputq); > hdr = buf_msg(skb); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > __skb_dequeue(inputq); > node = msg_orignode(hdr); > port = msg_origport(hdr); > > /* Has the twin SYN message already arrived ? */ > skb_queue_walk(defq, _skb) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > match = true; > break; > } > > if (!match) > return __skb_queue_tail(defq, skb); > > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } > > /* Non-SYN message from other link can be delivered right away */ > if (!msg_is_syn(hdr)) { > if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) > return __skb_queue_tail(inputq, skb); > else > return __skb_queue_tail(defq, skb); > } > > /* Matching SYN messages => return the one with data, if any */ > __skb_unlink(_skb, defq); > if (msg_data_sz(hdr)) { > kfree_skb(_skb); > __skb_queue_tail(inputq, skb); > } else { > kfree_skb(skb); > __skb_queue_tail(inputq, _skb); > } > /* Deliver subsequent non-SYN messages from same peer */ > skb_queue_walk_safe(defq, _skb, tmp) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > if (msg_is_syn(_hdr)) > break; > __skb_unlink(_skb, defq); > __skb_queue_tail(inputq, _skb); > } > } > > > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > > 751530ab0c49..165d88a503e4 100644 > > --- a/net/tipc/bcast.h > > +++ b/net/tipc/bcast.h > > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > > node); > > /* Cookie to be used between socket and broadcast layer > > * @rcast: replicast (instead of broadcast) was used at previous xmit > > * @mandatory: broadcast/replicast indication was set by user > > + * @deferredq: defer queue to make message in order > > * @expires: re-evaluate non-mandatory transmit method if we are past > this > > */ > > [...] > > > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > > net *net, u32 addr, > > tipc_link_update_caps(l, capabilities); > > } > > write_unlock_bh(&n->lock); > > + /* Calculate cluster capabilities */ > > + tn->capabilities = TIPC_NODE_CAPABILITIES; > > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > > + tn->capabilities &= temp_node->capabilities; > > + } > > Yes, you are right here. During a cluster upgrade a node can come back with > new capabilities which also must be reflected in the cluster capabilities field. > Actually, I think it would be a good idea to add cluster capabilities as a > separate patch. This makes this rather complex patch slightly smaller. > > > goto exit; > > } > > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > > break; > > [...] > > > > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > > struct tipc_name_seq *seq, > > &tsk->cong_link_cnt); > > } > > > > + /* Broadcast link is now free to choose method for next broadcast */ > > + if (rc == 0) { > > + method->mandatory = false; > > + method->expires = jiffies; > > + } > > No, we should leave the socket code as is, so we are sure it works with legacy > nodes. > We should instead make tipc_bcast_select_input_method() slightly smarter: > > static void tipc_bcast_select_xmit_method(struct net *net, int dests, > struct tipc_mc_method *method) { > ....... > /* Can current method be changed ? */ > method->expires = jiffies + TIPC_METHOD_EXPIRE; > if (method->mandatory) > return; > if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && > time_before(jiffies, exp)) > return; > > /* Determine method to use now */ > method->rcast = dests <= bb->bc_threshold; } > > I.e., we respect the 'mandatory' setting, because we need that for > group_cast wo work correctly, but we override 'method->expire' if the > cluster capabilities says that all nodes support MCAST_RBCTL. > Combined with the patch where we add forced BCAST or REPLICAST (make > sure you add this patch on top of that one) I think we have a achieved a > pretty smart and adaptive multicast subsystem. > > BR > ///jon > > > > tipc_nlist_purge(&dsts); > > > > return rc ? rc : dlen; > > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > > struct sk_buff *skb, > > if (unlikely(grp)) > > tipc_group_filter_msg(grp, &inputq, xmitq); > > > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > > &inputq); > > + > > /* Validate and add to receive buffer if there is space */ > > while ((skb = __skb_dequeue(&inputq))) { > > hdr = buf_msg(skb); > > -- > > 2.17.1 _______________________________________________ tipc-discussion mailing list tip...@li... https://lists.sourceforge.net/lists/listinfo/tipc-discussion |