From: Jon M. <jon...@er...> - 2019-02-01 12:39:01
|
Hi Hoang, I realized one more thing. Since this mechanism only is intended for MCAST messages, we must filter out those at the send side (in tipc_mcast_send_sync()) so that SYN messages are sent only for those. Group messaging, which has its own mechanism, does not need this new method, and cannot discover message duplicates, -it would just deliver them. This must be avoided. BR ///jon > -----Original Message----- > From: Jon Maloy > Sent: 31-Jan-19 16:05 > To: 'Hoang Le' <hoa...@de...>; ma...@do...; > yin...@wi...; tip...@li... > Subject: RE: [net-next] tipc: smooth change between replicast and broadcast > > Hi Hoang, > Nice job, but still a few things to improve. See below. > > > -----Original Message----- > > From: Hoang Le <hoa...@de...> > > Sent: 29-Jan-19 04:22 > > To: Jon Maloy <jon...@er...>; ma...@do...; > > yin...@wi...; tip...@li... > > Subject: [net-next] tipc: smooth change between replicast and > > broadcast > > > > Currently, a multicast stream may start out using replicast, because > > there are few destinations, and then it should ideally switch to > > L2/broadcast IGMP/multicast when the number of destinations grows > > beyond a certain limit. The opposite should happen when the number > > decreases below the limit. > > > > To eliminate the risk of message reordering caused by method change, a > > sending socket must stick to a previously selected method until it > > enters an idle period of 5 seconds. Means there is a 5 seconds pause > > in the traffic from the sender socket. > > > > In this fix, we allow such a switch between replicast and broadcast > > without a > > 5 seconds pause in the traffic. > > > > Solution is to send a dummy message with only the header, also with > > the SYN bit set, via broadcast/replicast. For the data message, the > > SYN bit set and sending via replicast/broadcast. > > > > Then, at receiving side any messages follow first SYN bit message > > (data or dummy), they will be hold in deferred queue until another > > pair (dummy or > > data) arrived. > > > > For compatibility reasons we have to introduce a new capability flag > > TIPC_MCAST_RBCTL to handle this new feature. Because of there is a > > dummy message sent out, then poll return empty at old machines. > > > > Signed-off-by: Jon Maloy <jon...@er...> > > Signed-off-by: Hoang Le <hoa...@de...> > > --- > > net/tipc/bcast.c | 116 > > +++++++++++++++++++++++++++++++++++++++++++++- > > net/tipc/bcast.h | 5 ++ > > net/tipc/core.c | 2 + > > net/tipc/core.h | 3 ++ > > net/tipc/msg.h | 10 ++++ > > net/tipc/node.c | 10 ++++ > > net/tipc/node.h | 6 ++- > > net/tipc/socket.c | 10 ++++ > > 8 files changed, 159 insertions(+), 3 deletions(-) > > > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > > d8026543bf4c..e3a85227d4aa 100644 > > --- a/net/tipc/bcast.c > > +++ b/net/tipc/bcast.c > > @@ -295,11 +295,15 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > struct tipc_mc_method *method, struct tipc_nlist *dests, > > u16 *cong_link_cnt) > > { > > - struct sk_buff_head inputq, localq; > > + struct sk_buff_head inputq, localq, tmpq; > > + bool rcast = method->rcast; > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > int rc = 0; > > > > skb_queue_head_init(&inputq); > > skb_queue_head_init(&localq); > > + skb_queue_head_init(&tmpq); > > > > /* Clone packets before they are consumed by next call */ > > if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { @@ - > > 309,6 +313,53 @@ int tipc_mcast_xmit(struct net *net, struct > > sk_buff_head *pkts, > > /* Send according to determined transmit method */ > > if (dests->remote) { > > tipc_bcast_select_xmit_method(net, dests->remote, > method); > > I would suggest that you move the whole code block below into a separate > function: > msg_set_is_rcast(hdr, method->rcast); > if (rcast != method->rcast) > tipc_mcast_send_sync(net, skb_peek(pkts)); > > This function sets the SYN bit in the packet header, then copies that header > into a dummy header, inverts the is_rcast bit in that header and sends it out > via the appropriate method. > Note that this involves a small change: the real message is sent out via the > selected method, the dummy message always via the other method, > whichever it is. > > > + > > + if (tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) { > > + skb = skb_peek(pkts); > > + hdr = buf_msg(skb); > > + > > + if (msg_user(hdr) == MSG_FRAGMENTER) > > + hdr = msg_get_wrapped(hdr); > > + if (msg_type(hdr) != TIPC_MCAST_MSG) > > + goto xmit; > > + > > + msg_set_syn(hdr, 0); > > + msg_set_is_rcast(hdr, method->rcast); > > + > > + /* switch mode */ > > + if (rcast != method->rcast) { > > + /* Build message's copied */ > > + _skb = tipc_buf_acquire(MCAST_H_SIZE, > > + GFP_KERNEL); > > + if (!skb) { > > + rc = -ENOMEM; > > + goto exit; > > + } > > + skb_orphan(_skb); > > + skb_copy_to_linear_data(_skb, hdr, > > + MCAST_H_SIZE); > > + > > + /* Build dummy header */ > > + _hdr = buf_msg(_skb); > > + msg_set_size(_hdr, MCAST_H_SIZE); > > + __skb_queue_tail(&tmpq, _skb); > > + > > + msg_set_syn(hdr, 1); > > + msg_set_syn(_hdr, 1); > > + msg_set_is_rcast(_hdr, rcast); > > + /* Prepare for 'synching' */ > > + if (rcast) > > + tipc_rcast_xmit(net, &tmpq, dests, > > + cong_link_cnt); > > + else > > + tipc_bcast_xmit(net, &tmpq, > > + cong_link_cnt); > > + > > + /* This queue should normally be empty by > > now */ > > + __skb_queue_purge(&tmpq); > > + } > > + } > > +xmit: > > if (method->rcast) > > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > > else > > @@ -576,3 +627,66 @@ void tipc_nlist_purge(struct tipc_nlist *nl) > > nl->remote = 0; > > nl->local = false; > > } > > + > > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > > + struct sk_buff_head *inputq) > > +{ > > + struct sk_buff *skb, *_skb; > > + struct tipc_msg *hdr, *_hdr; > > + u32 node, port, _node, _port; > > + bool match = false; > > If you put in the following lines here we will save some instruction cycles: > hdr = buf_msg(skb_peek(inputq)); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > After all this will be the case in the vast majority of cases. It is safe to just > peek the queue and access, since inputq never can be empty. > > > + > > + skb = __skb_dequeue(inputq); > > + if (!skb) > > + return; > > + > > + hdr = buf_msg(skb); > > + node = msg_orignode(hdr); > > + port = msg_origport(hdr); > > + > > + /* Find a peer port if its existing in defer queue */ > > + while ((_skb = skb_peek(defq))) { > > + _hdr = buf_msg(_skb); > > + _node = msg_orignode(_hdr); > > + _port = msg_origport(_hdr); > > + > > + if (_node != node) > > + continue; > > + if (_port != port) > > + continue; > > + > > + if (!match) { > > + if (msg_is_syn(hdr) && > > + msg_is_rcast(hdr) != msg_is_rcast(_hdr)) { > > + __skb_dequeue(defq); > > + if (msg_data_sz(hdr)) { > > + __skb_queue_tail(inputq, skb); > > + kfree_skb(_skb); > > + } else { > > + __skb_queue_tail(inputq, _skb); > > + kfree_skb(skb); > > + } > > + match = true; > > + } else { > > + break; > > + } > > + } else { > > + if (msg_is_syn(_hdr)) > > + return; > > + /* Dequeued to receive buffer */ > > + __skb_dequeue(defq); > > + __skb_queue_tail(inputq, _skb); > > + } > > + } > > + > > + if (match) > > + return; > > + > > + if (msg_is_syn(hdr)) { > > + /* Enqueue and defer to next synching */ > > + __skb_queue_tail(defq, skb); > > + } else { > > + /* Direct enqueued */ > > + __skb_queue_tail(inputq, skb); > > + } > > +} > > Above function is hard to follow and does not convince me. What if there are > messages from many sources, and you find the match in the middle of the > queue? > I suggest you break down the logics to smaller tasks, e.g., as follows (code > compiles ok, but is untested): > > void tipc_mcast_filter_msg(struct sk_buff_head *defq, > struct sk_buff_head *inputq) { > struct sk_buff *skb, *_skb, *tmp; > struct tipc_msg *hdr, *_hdr; > bool match = false; > u32 node, port; > > skb = skb_peek(inputq); > hdr = buf_msg(skb); > if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > return; > > __skb_dequeue(inputq); > node = msg_orignode(hdr); > port = msg_origport(hdr); > > /* Has the twin SYN message already arrived ? */ > skb_queue_walk(defq, _skb) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > match = true; > break; > } > > if (!match) > return __skb_queue_tail(defq, skb); > > if (!msg_is_syn(_hdr)) { > pr_warn_ratelimited("Non-sync mcast heads deferred queue\n"); > __skb_queue_purge(defq); > return __skb_queue_tail(inputq, skb); > } > > /* Non-SYN message from other link can be delivered right away */ > if (!msg_is_syn(hdr)) { > if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) > return __skb_queue_tail(inputq, skb); > else > return __skb_queue_tail(defq, skb); > } > > /* Matching SYN messages => return the one with data, if any */ > __skb_unlink(_skb, defq); > if (msg_data_sz(hdr)) { > kfree_skb(_skb); > __skb_queue_tail(inputq, skb); > } else { > kfree_skb(skb); > __skb_queue_tail(inputq, _skb); > } > /* Deliver subsequent non-SYN messages from same peer */ > skb_queue_walk_safe(defq, _skb, tmp) { > _hdr = buf_msg(_skb); > if (msg_orignode(_hdr) != node) > continue; > if (msg_origport(_hdr) != port) > continue; > if (msg_is_syn(_hdr)) > break; > __skb_unlink(_skb, defq); > __skb_queue_tail(inputq, _skb); > } > } > > > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > > 751530ab0c49..165d88a503e4 100644 > > --- a/net/tipc/bcast.h > > +++ b/net/tipc/bcast.h > > @@ -63,11 +63,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 > > node); > > /* Cookie to be used between socket and broadcast layer > > * @rcast: replicast (instead of broadcast) was used at previous xmit > > * @mandatory: broadcast/replicast indication was set by user > > + * @deferredq: defer queue to make message in order > > * @expires: re-evaluate non-mandatory transmit method if we are past > this > > */ > > [...] > > > @@ -383,6 +383,11 @@ static struct tipc_node *tipc_node_create(struct > > net *net, u32 addr, > > tipc_link_update_caps(l, capabilities); > > } > > write_unlock_bh(&n->lock); > > + /* Calculate cluster capabilities */ > > + tn->capabilities = TIPC_NODE_CAPABILITIES; > > + list_for_each_entry_rcu(temp_node, &tn->node_list, list) { > > + tn->capabilities &= temp_node->capabilities; > > + } > > Yes, you are right here. During a cluster upgrade a node can come back with > new capabilities which also must be reflected in the cluster capabilities field. > Actually, I think it would be a good idea to add cluster capabilities as a > separate patch. This makes this rather complex patch slightly smaller. > > > goto exit; > > } > > n = kzalloc(sizeof(*n), GFP_ATOMIC); @@ -433,6 +438,11 @@ static > > struct tipc_node *tipc_node_create(struct net *net, u32 addr, > > break; > > [...] > > > > > @@ -817,6 +819,11 @@ static int tipc_sendmcast(struct socket *sock, > > struct tipc_name_seq *seq, > > &tsk->cong_link_cnt); > > } > > > > + /* Broadcast link is now free to choose method for next broadcast */ > > + if (rc == 0) { > > + method->mandatory = false; > > + method->expires = jiffies; > > + } > > No, we should leave the socket code as is, so we are sure it works with legacy > nodes. > We should instead make tipc_bcast_select_input_method() slightly smarter: > > static void tipc_bcast_select_xmit_method(struct net *net, int dests, > struct tipc_mc_method *method) { > ....... > /* Can current method be changed ? */ > method->expires = jiffies + TIPC_METHOD_EXPIRE; > if (method->mandatory) > return; > if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) && > time_before(jiffies, exp)) > return; > > /* Determine method to use now */ > method->rcast = dests <= bb->bc_threshold; } > > I.e., we respect the 'mandatory' setting, because we need that for > group_cast wo work correctly, but we override 'method->expire' if the > cluster capabilities says that all nodes support MCAST_RBCTL. > Combined with the patch where we add forced BCAST or REPLICAST (make > sure you add this patch on top of that one) I think we have a achieved a > pretty smart and adaptive multicast subsystem. > > BR > ///jon > > > > tipc_nlist_purge(&dsts); > > > > return rc ? rc : dlen; > > @@ -2157,6 +2164,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, > > struct sk_buff *skb, > > if (unlikely(grp)) > > tipc_group_filter_msg(grp, &inputq, xmitq); > > > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > > &inputq); > > + > > /* Validate and add to receive buffer if there is space */ > > while ((skb = __skb_dequeue(&inputq))) { > > hdr = buf_msg(skb); > > -- > > 2.17.1 |