From: Jon M. <jon...@er...> - 2019-02-19 00:03:11
|
This also looks very good now, I only have some comments to the log text. Just make sure you test this with the groupcast test program before you post it. Acked-by: Jon > -----Original Message----- > From: Hoang Le <hoa...@de...> > Sent: 18-Feb-19 05:08 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [net-next v3 3/3] tipc: smooth change between replicast and > broadcast > > Currently, a multicast stream may start out using replicast, because there are > few destinations, and then it should ideally switch to L2/broadcast > IGMP/multicast when the number of destinations grows beyond a certain > limit. The opposite should happen when the number decreases below the > limit. > > To eliminate the risk of message reordering caused by method change, a > sending socket must stick to a previously selected method until it enters an > idle period of 5 seconds. Means there is a 5 seconds pause in the traffic from > the sender socket. If the sender never makes such a pause, the method will never change, and transmission may become very inefficient as the cluster grows. > > With this commit, we allow such a switch between replicast and broadcast > without 5 seconds pause in the traffic. without any need for a traffic pause. > > Solution is to send a dummy message with only the header, also with the SYN > bit set, via broadcast or replicast. For the data message, the SYN bit is set and > sending via replicast or broadcast (inverse method with dummy). > > Then, at receiving side any messages follow first SYN bit message (data or > dummy message), they will be hold s/hold/held in deferred queue until another pair > (dummy or data message) arrived in other link. ///jon > > Signed-off-by: Hoang Le <hoa...@de...> > --- > net/tipc/bcast.c | 165 > +++++++++++++++++++++++++++++++++++++++++++++- > net/tipc/bcast.h | 5 ++ > net/tipc/msg.h | 10 +++ > net/tipc/socket.c | 5 ++ > 4 files changed, 184 insertions(+), 1 deletion(-) > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index > 12b59268bdd6..d806e3714280 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -220,9 +220,24 @@ static void tipc_bcast_select_xmit_method(struct > net *net, int dests, > } > /* Can current method be changed ? */ > method->expires = jiffies + TIPC_METHOD_EXPIRE; > - if (method->mandatory || time_before(jiffies, exp)) > + if (method->mandatory) > return; > > + if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL) && > + time_before(jiffies, exp)) > + return; > + > + /* Configuration as force 'broadcast' method */ > + if (bb->force_bcast) { > + method->rcast = false; > + return; > + } > + /* Configuration as force 'replicast' method */ > + if (bb->force_rcast) { > + method->rcast = true; > + return; > + } > + /* Configuration as 'selectable' or default method */ > /* Determine method to use now */ > method->rcast = dests <= bb->bc_threshold; } @@ -285,6 +300,63 > @@ static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, > return 0; > } > > +/* tipc_mcast_send_sync - deliver a dummy message with SYN bit > + * @net: the applicable net namespace > + * @skb: socket buffer to copy > + * @method: send method to be used > + * @dests: destination nodes for message. > + * @cong_link_cnt: returns number of encountered congested destination > +links > + * Returns 0 if success, otherwise errno */ static int > +tipc_mcast_send_sync(struct net *net, struct sk_buff *skb, > + struct tipc_mc_method *method, > + struct tipc_nlist *dests, > + u16 *cong_link_cnt) > +{ > + struct sk_buff_head tmpq; > + struct sk_buff *_skb; > + struct tipc_msg *hdr, *_hdr; > + > + /* Is a cluster supporting with new capabilities ? */ > + if (!(tipc_net(net)->capabilities & TIPC_MCAST_RBCTL)) > + return 0; > + > + hdr = buf_msg(skb); > + if (msg_user(hdr) == MSG_FRAGMENTER) > + hdr = msg_get_wrapped(hdr); > + if (msg_type(hdr) != TIPC_MCAST_MSG) > + return 0; > + > + /* Allocate dummy message */ > + _skb = tipc_buf_acquire(MCAST_H_SIZE, GFP_KERNEL); > + if (!skb) > + return -ENOMEM; > + > + /* Preparing for 'synching' header */ > + msg_set_syn(hdr, 1); > + > + /* Copy skb's header into a dummy header */ > + skb_copy_to_linear_data(_skb, hdr, MCAST_H_SIZE); > + skb_orphan(_skb); > + > + /* Reverse method for dummy message */ > + _hdr = buf_msg(_skb); > + msg_set_size(_hdr, MCAST_H_SIZE); > + msg_set_is_rcast(_hdr, !msg_is_rcast(hdr)); > + > + skb_queue_head_init(&tmpq); > + __skb_queue_tail(&tmpq, _skb); > + if (method->rcast) > + tipc_bcast_xmit(net, &tmpq, cong_link_cnt); > + else > + tipc_rcast_xmit(net, &tmpq, dests, cong_link_cnt); > + > + /* This queue should normally be empty by now */ > + __skb_queue_purge(&tmpq); > + > + return 0; > +} > + > /* tipc_mcast_xmit - deliver message to indicated destination nodes > * and to identified node local sockets > * @net: the applicable net namespace > @@ -300,6 +372,9 @@ int tipc_mcast_xmit(struct net *net, struct > sk_buff_head *pkts, > u16 *cong_link_cnt) > { > struct sk_buff_head inputq, localq; > + struct sk_buff *skb; > + struct tipc_msg *hdr; > + bool rcast = method->rcast; > int rc = 0; > > skb_queue_head_init(&inputq); > @@ -313,6 +388,18 @@ int tipc_mcast_xmit(struct net *net, struct > sk_buff_head *pkts, > /* Send according to determined transmit method */ > if (dests->remote) { > tipc_bcast_select_xmit_method(net, dests->remote, > method); > + > + skb = skb_peek(pkts); > + hdr = buf_msg(skb); > + if (msg_user(hdr) == MSG_FRAGMENTER) > + hdr = msg_get_wrapped(hdr); > + msg_set_is_rcast(hdr, method->rcast); > + > + /* Switch method ? */ > + if (rcast != method->rcast) > + tipc_mcast_send_sync(net, skb, method, > + dests, cong_link_cnt); > + > if (method->rcast) > rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > else > @@ -672,3 +759,79 @@ u32 tipc_bcast_get_broadcast_ratio(struct net *net) > > return bb->rc_ratio; > } > + > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > + struct sk_buff_head *inputq) > +{ > + struct sk_buff *skb, *_skb, *tmp; > + struct tipc_msg *hdr, *_hdr; > + bool match = false; > + u32 node, port; > + > + skb = skb_peek(inputq); > + hdr = buf_msg(skb); > + > + if (likely(!msg_is_syn(hdr) && skb_queue_empty(defq))) > + return; > + > + node = msg_orignode(hdr); > + port = msg_origport(hdr); > + > + /* Has the twin SYN message already arrived ? */ > + skb_queue_walk(defq, _skb) { > + _hdr = buf_msg(_skb); > + if (msg_orignode(_hdr) != node) > + continue; > + if (msg_origport(_hdr) != port) > + continue; > + match = true; > + break; > + } > + > + if (!match) { > + if (!msg_is_syn(hdr)) > + return; > + __skb_dequeue(inputq); > + __skb_queue_tail(defq, skb); > + return; > + } > + > + /* Deliver non-SYN message from other link, otherwise queue it */ > + if (!msg_is_syn(hdr)) { > + if (msg_is_rcast(hdr) != msg_is_rcast(_hdr)) > + return; > + __skb_dequeue(inputq); > + __skb_queue_tail(defq, skb); > + return; > + } > + > + /* Queue non-SYN/SYN message from same link */ > + if (msg_is_rcast(hdr) == msg_is_rcast(_hdr)) { > + __skb_dequeue(inputq); > + __skb_queue_tail(defq, skb); > + return; > + } > + > + /* Matching SYN messages => return the one with data, if any */ > + __skb_unlink(_skb, defq); > + if (msg_data_sz(hdr)) { > + kfree_skb(_skb); > + } else { > + __skb_dequeue(inputq); > + kfree_skb(skb); > + __skb_queue_tail(inputq, _skb); > + } > + > + /* Deliver subsequent non-SYN messages from same peer */ > + skb_queue_walk_safe(defq, _skb, tmp) { > + _hdr = buf_msg(_skb); > + if (msg_orignode(_hdr) != node) > + continue; > + if (msg_origport(_hdr) != port) > + continue; > + if (msg_is_syn(_hdr)) > + break; > + __skb_unlink(_skb, defq); > + __skb_queue_tail(inputq, _skb); > + } > +} > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h index > 37c55e7347a5..484bde289d3a 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -67,11 +67,13 @@ void tipc_nlist_del(struct tipc_nlist *nl, u32 node); > /* Cookie to be used between socket and broadcast layer > * @rcast: replicast (instead of broadcast) was used at previous xmit > * @mandatory: broadcast/replicast indication was set by user > + * @deferredq: defer queue to make message in order > * @expires: re-evaluate non-mandatory transmit method if we are past this > */ > struct tipc_mc_method { > bool rcast; > bool mandatory; > + struct sk_buff_head deferredq; > unsigned long expires; > }; > > @@ -99,6 +101,9 @@ int tipc_bclink_reset_stats(struct net *net); > u32 tipc_bcast_get_broadcast_mode(struct net *net); > u32 tipc_bcast_get_broadcast_ratio(struct net *net); > > +void tipc_mcast_filter_msg(struct sk_buff_head *defq, > + struct sk_buff_head *inputq); > + > static inline void tipc_bcast_lock(struct net *net) { > spin_lock_bh(&tipc_net(net)->bclock); > diff --git a/net/tipc/msg.h b/net/tipc/msg.h index > d7e4b8b93f9d..528ba9241acc 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -257,6 +257,16 @@ static inline void msg_set_src_droppable(struct > tipc_msg *m, u32 d) > msg_set_bits(m, 0, 18, 1, d); > } > > +static inline bool msg_is_rcast(struct tipc_msg *m) { > + return msg_bits(m, 0, 18, 0x1); > +} > + > +static inline void msg_set_is_rcast(struct tipc_msg *m, bool d) { > + msg_set_bits(m, 0, 18, 0x1, d); > +} > + > static inline void msg_set_size(struct tipc_msg *m, u32 sz) { > m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz); diff --git > a/net/tipc/socket.c b/net/tipc/socket.c index 8fc5acd4820d..de83eb1e718e > 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -483,6 +483,7 @@ static int tipc_sk_create(struct net *net, struct socket > *sock, > tsk_set_unreturnable(tsk, true); > if (sock->type == SOCK_DGRAM) > tsk_set_unreliable(tsk, true); > + __skb_queue_head_init(&tsk->mc_method.deferredq); > } > > trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " "); @@ -580,6 > +581,7 @@ static int tipc_release(struct socket *sock) > sk->sk_shutdown = SHUTDOWN_MASK; > tipc_sk_leave(tsk); > tipc_sk_withdraw(tsk, 0, NULL); > + __skb_queue_purge(&tsk->mc_method.deferredq); > sk_stop_timer(sk, &sk->sk_timer); > tipc_sk_remove(tsk); > > @@ -2157,6 +2159,9 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct > sk_buff *skb, > if (unlikely(grp)) > tipc_group_filter_msg(grp, &inputq, xmitq); > > + if (msg_type(hdr) == TIPC_MCAST_MSG) > + tipc_mcast_filter_msg(&tsk->mc_method.deferredq, > &inputq); > + > /* Validate and add to receive buffer if there is space */ > while ((skb = __skb_dequeue(&inputq))) { > hdr = buf_msg(skb); > -- > 2.17.1 |