From: Tuong L. <tuo...@de...> - 2019-06-04 05:23:00
|
This patch series is to resolve some issues found with the current link changeover mechanism, it also includes an optimization for the link synching. Tuong Lien (2): tipc: optimize link synching mechanism tipc: fix changeover issues due to large packet net/tipc/link.c | 118 +++++++++++++++++++++++++++++++++++++++++++++++++------- net/tipc/msg.c | 62 +++++++++++++++++++++++++++++ net/tipc/msg.h | 28 +++++++++++++- net/tipc/node.c | 6 ++- net/tipc/node.h | 6 ++- 5 files changed, 202 insertions(+), 18 deletions(-) -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-12 05:16:03
|
This patch series is to resolve some issues found with the current link changeover mechanism, it also includes an optimization for the link synching. Tuong Lien (2): tipc: optimize link synching mechanism tipc: fix changeover issues due to large packet net/tipc/link.c | 119 +++++++++++++++++++++++++++++++++++++++++++++++++------- net/tipc/msg.c | 59 ++++++++++++++++++++++++++++ net/tipc/msg.h | 28 ++++++++++++- net/tipc/node.c | 6 ++- net/tipc/node.h | 6 ++- 5 files changed, 199 insertions(+), 19 deletions(-) -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-12 05:16:03
|
This commit along with the next one are to resolve the issues with the link changeover mechanism. See that commit for details. Basically, for the link synching, from now on, we will send only one single ("dummy") SYNCH message to peer. The SYNCH message does not contain any data, just a header conveying the synch point to the peer. A new node capability flag ("TIPC_TUNNEL_ENHANCED") is introduced for backward compatible! Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jon...@er...> Suggested-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 26 ++++++++++++++++++++++++++ net/tipc/msg.h | 10 ++++++++++ net/tipc/node.c | 6 ++++-- net/tipc/node.h | 6 ++++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 66d3a07bc571..e215b4ba6a4b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1665,6 +1665,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff_head *queue = &l->transmq; struct sk_buff_head tmpxq, tnlq; u16 pktlen, pktcnt, seqno = l->snd_nxt; + u16 syncpt; if (!tnl) return; @@ -1684,6 +1685,31 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, tipc_link_xmit(l, &tnlq, &tmpxq); __skb_queue_purge(&tmpxq); + /* Link Synching: + * From now on, send only one single ("dummy") SYNCH message + * to peer. The SYNCH message does not contain any data, just + * a header conveying the synch point to the peer. + */ + if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, + INT_H_SIZE, 0, l->addr, + tipc_own_addr(l->net), + 0, 0, 0); + if (!tnlskb) { + pr_warn("%sunable to create dummy SYNCH_MSG\n", + link_co_err); + return; + } + + hdr = buf_msg(tnlskb); + syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1; + msg_set_syncpt(hdr, syncpt); + msg_set_bearer_id(hdr, l->peer_bearer_id); + __skb_queue_tail(&tnlq, tnlskb); + tipc_link_xmit(tnl, &tnlq, xmitq); + return; + } + /* Initialize reusable tunnel packet header */ tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, mtyp, INT_H_SIZE, l->addr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index da509f0eb9ca..fca042cdff88 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -877,6 +877,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + static inline u32 msg_conn_ack(struct tipc_msg *m) { return msg_bits(m, 9, 16, 0xffff); diff --git a/net/tipc/node.c b/net/tipc/node.c index 324a1f91b394..5d8b48051bb9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1649,7 +1649,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, int usr = msg_user(hdr); int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - u16 iseqno = msg_seqno(msg_inner_hdr(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; int state = n->state; @@ -1748,7 +1747,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate synch mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) { - syncpt = iseqno + exp_pkts - 1; + if (n->capabilities & TIPC_TUNNEL_ENHANCED) + syncpt = msg_syncpt(hdr); + else + syncpt = msg_seqno(msg_inner_hdr(hdr)) + exp_pkts - 1; if (!tipc_link_is_up(l)) __tipc_node_link_up(n, bearer_id, xmitq); if (n->state == SELF_UP_PEER_UP) { diff --git a/net/tipc/node.h b/net/tipc/node.h index c0bf49ea3de4..291d0ecd4101 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -53,7 +53,8 @@ enum { TIPC_NODE_ID128 = (1 << 5), TIPC_LINK_PROTO_SEQNO = (1 << 6), TIPC_MCAST_RBCTL = (1 << 7), - TIPC_GAP_ACK_BLOCK = (1 << 8) + TIPC_GAP_ACK_BLOCK = (1 << 8), + TIPC_TUNNEL_ENHANCED = (1 << 9) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -64,7 +65,8 @@ enum { TIPC_NODE_ID128 | \ TIPC_LINK_PROTO_SEQNO | \ TIPC_MCAST_RBCTL | \ - TIPC_GAP_ACK_BLOCK) + TIPC_GAP_ACK_BLOCK | \ + TIPC_TUNNEL_ENHANCED) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-12 05:16:04
|
In conjunction with changing the interfaces' MTU (e.g. especially in the case of a bonding) where the TIPC links are brought up and down in a short time, a couple of issues were detected with the current link changeover mechanism: 1) When one link is up but immediately forced down again, the failover procedure will be carried out in order to failover all the messages in the link's transmq queue onto the other working link. The link and node state is also set to FAILINGOVER as part of the process. The message will be transmited in form of a FAILOVER_MSG, so its size is plus of 40 bytes (= the message header size). There is no problem if the original message size is not larger than the link's MTU - 40, and indeed this is the max size of a normal payload messages. However, in the situation above, because the link has just been up, the messages in the link's transmq are almost SYNCH_MSGs which had been generated by the link synching procedure, then their size might reach the max value already! When the FAILOVER_MSG is built on the top of such a SYNCH_MSG, its size will exceed the link's MTU. As a result, the messages are dropped silently and the failover procedure will never end up, the link will not be able to exit the FAILINGOVER state, so cannot be re-established. 2) The same scenario above can happen more easily in case the MTU of the links is set differently or when changing. In that case, as long as a large message in the failure link's transmq queue was built and fragmented with its link's MTU > the other link's one, the issue will happen (there is no need of a link synching in advance). 3) The link synching procedure also faces with the same issue but since the link synching is only started upon receipt of a SYNCH_MSG, dropping the message will not result in a state deadlock, but it is not expected as design. The 1) & 3) issues are resolved by the last commit that only a dummy SYNCH_MSG (i.e. without data) is generated at the link synching, so the size of a FAILOVER_MSG if any then will never exceed the link's MTU. For the 2) issue, the only solution is trying to fragment the messages in the failure link's transmq queue according to the working link's MTU so they can be failovered then. A new function is made to accomplish this, it will still be a TUNNEL PROTOCOL/FAILOVER MSG but if the original message size is too large, it will be fragmented & reassembled at the receiving side. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++++--------- net/tipc/msg.c | 59 ++++++++++++++++++++++++++++++++++++ net/tipc/msg.h | 18 ++++++++++- 3 files changed, 155 insertions(+), 15 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index e215b4ba6a4b..2c274777b2dd 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -180,6 +180,7 @@ struct tipc_link { /* Fragmentation/reassembly */ struct sk_buff *reasm_buf; + struct sk_buff *reasm_tnlmsg; /* Broadcast */ u16 ackers; @@ -897,8 +898,10 @@ void tipc_link_reset(struct tipc_link *l) l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); + kfree_skb(l->reasm_tnlmsg); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; + l->reasm_tnlmsg = NULL; l->failover_reasm_skb = NULL; l->rcv_unacked = 0; l->snd_nxt = 1; @@ -940,6 +943,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, int rc = 0; if (unlikely(msg_size(hdr) > mtu)) { + pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n", + skb_queue_len(list), msg_user(hdr), + msg_type(hdr), msg_size(hdr), mtu); skb_queue_purge(list); return -EMSGSIZE; } @@ -1233,6 +1239,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { struct sk_buff **reasm_skb = &l->failover_reasm_skb; + struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg; struct sk_buff_head *fdefq = &l->failover_deferdq; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff *iskb; @@ -1240,40 +1247,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, int rc = 0; u16 seqno; - /* SYNCH_MSG */ - if (msg_type(hdr) == SYNCH_MSG) - goto drop; + if (msg_type(hdr) == SYNCH_MSG) { + kfree_skb(skb); + return 0; + } - /* FAILOVER_MSG */ - if (!tipc_msg_extract(skb, &iskb, &ipos)) { - pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n", - skb_queue_len(fdefq)); - return rc; + /* Not a fragment? */ + if (likely(!msg_nof_fragms(hdr))) { + if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) { + pr_warn_ratelimited("Unable to extract msg, defq: %d\n", + skb_queue_len(fdefq)); + return 0; + } + kfree_skb(skb); + } else { + /* Set fragment type for buf_append */ + if (msg_fragm_no(hdr) == 1) + msg_set_type(hdr, FIRST_FRAGMENT); + else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr)) + msg_set_type(hdr, FRAGMENT); + else + msg_set_type(hdr, LAST_FRAGMENT); + + if (!tipc_buf_append(reasm_tnlmsg, &skb)) { + /* Successful but non-complete reassembly? */ + if (*reasm_tnlmsg || link_is_bc_rcvlink(l)) + return 0; + pr_warn_ratelimited("Unable to reassemble tunnel msg\n"); + return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + } + iskb = skb; } do { seqno = buf_seqno(iskb); - if (unlikely(less(seqno, l->drop_point))) { kfree_skb(iskb); continue; } - if (unlikely(seqno != l->drop_point)) { __tipc_skb_queue_sorted(fdefq, seqno, iskb); continue; } l->drop_point++; - if (!tipc_data_input(l, iskb, inputq)) rc |= tipc_link_input(l, iskb, inputq, reasm_skb); if (unlikely(rc)) break; } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point))); -drop: - kfree_skb(skb); return rc; } @@ -1663,15 +1686,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff *skb, *tnlskb; struct tipc_msg *hdr, tnlhdr; struct sk_buff_head *queue = &l->transmq; - struct sk_buff_head tmpxq, tnlq; + struct sk_buff_head tmpxq, tnlq, frags; u16 pktlen, pktcnt, seqno = l->snd_nxt; + bool pktcnt_need_update = false; u16 syncpt; + int rc; if (!tnl) return; skb_queue_head_init(&tnlq); skb_queue_head_init(&tmpxq); + skb_queue_head_init(&frags); /* At least one packet required for safe algorithm => add dummy */ skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, @@ -1727,6 +1753,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, if (queue == &l->backlogq) msg_set_seqno(hdr, seqno++); pktlen = msg_size(hdr); + + /* Tunnel link MTU is not large enough? This could be + * due to: + * 1) Link MTU has just changed or set differently; + * 2) Or FAILOVER on the top of a SYNCH message + * + * The 2nd case should not happen if peer supports + * TIPC_TUNNEL_ENHANCED + */ + if (pktlen > tnl->mtu - INT_H_SIZE) { + if (mtyp == FAILOVER_MSG && + (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu, + &frags); + if (rc) { + pr_warn("%sunable to frag msg: rc %d\n", + link_co_err, rc); + return; + } + pktcnt += skb_queue_len(&frags) - 1; + pktcnt_need_update = true; + skb_queue_splice_tail_init(&frags, &tnlq); + continue; + } + /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED + * => Just warn it and return! + */ + pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n", + link_co_err, msg_user(hdr), + msg_type(hdr), msg_size(hdr)); + return; + } + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); if (!tnlskb) { @@ -1742,6 +1801,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, goto tnl; } + if (pktcnt_need_update) + skb_queue_walk(&tnlq, skb) { + hdr = buf_msg(skb); + msg_set_msgcnt(hdr, pktcnt); + } + tipc_link_xmit(tnl, &tnlq, xmitq); if (mtyp == FAILOVER_MSG) { diff --git a/net/tipc/msg.c b/net/tipc/msg.c index f48e5857210f..e6d49cdc61b4 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -244,6 +244,65 @@ bool tipc_msg_validate(struct sk_buff **_skb) } /** + * tipc_msg_fragment - build a fragment skb list for TIPC message + * + * @skb: TIPC message skb + * @hdr: internal msg header to be put on the top of the fragments + * @pktmax: max size of a fragment incl. the header + * @frags: returned fragment skb list + * + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL + * or -ENOMEM + */ +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags) +{ + int pktno, nof_fragms, dsz, dmax, eat; + struct tipc_msg *_hdr; + struct sk_buff *_skb; + u8 *data; + + /* Non-linear buffer? */ + if (skb_linearize(skb)) + return -ENOMEM; + + data = (u8 *)skb->data; + dsz = msg_size(buf_msg(skb)); + dmax = pktmax - INT_H_SIZE; + if (dsz <= dmax || !dmax) + return -EINVAL; + + nof_fragms = dsz / dmax + 1; + for (pktno = 1; pktno <= nof_fragms; pktno++) { + if (pktno < nof_fragms) + eat = dmax; + else + eat = dsz % dmax; + /* Allocate a new fragment */ + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); + if (!_skb) + goto error; + skb_orphan(_skb); + __skb_queue_tail(frags, _skb); + /* Copy header & data to the fragment */ + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); + data += eat; + /* Update the fragment's header */ + _hdr = buf_msg(_skb); + msg_set_fragm_no(_hdr, pktno); + msg_set_nof_fragms(_hdr, nof_fragms); + msg_set_size(_hdr, INT_H_SIZE + eat); + } + return 0; + +error: + __skb_queue_purge(frags); + __skb_queue_head_init(frags); + return -ENOMEM; +} + +/** * tipc_msg_build - create buffer chain containing specified header and data * @mhdr: Message header, to be prepended to data * @m: User message diff --git a/net/tipc/msg.h b/net/tipc/msg.h index fca042cdff88..1c8c8dd32a4e 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) msg_set_bits(m, 4, 16, 0xffff, n); } +static inline u32 msg_nof_fragms(struct tipc_msg *m) +{ + return msg_bits(m, 4, 0, 0xffff); +} + +static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 4, 0, 0xffff, n); +} + +static inline u32 msg_fragm_no(struct tipc_msg *m) +{ + return msg_bits(m, 4, 16, 0xffff); +} + static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) { msg_set_bits(m, 4, 16, 0xffff, n); } - static inline u16 msg_next_sent(struct tipc_msg *m) { return msg_bits(m, 4, 0, 0xffff); @@ -1045,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu); bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, u32 mtu, u32 dnode); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); -- 2.13.7 |
From: David M. <da...@da...> - 2019-07-12 17:45:21
|
net-next is closed right now, resubmit this when the tree opens back up. |
From: Tuong L. <tuo...@de...> - 2019-07-24 01:56:37
|
This patch series is to resolve some issues found with the current link changeover mechanism, it also includes an optimization for the link synching. Tuong Lien (2): tipc: optimize link synching mechanism tipc: fix changeover issues due to large packet net/tipc/link.c | 119 +++++++++++++++++++++++++++++++++++++++++++++++++------- net/tipc/msg.c | 59 ++++++++++++++++++++++++++++ net/tipc/msg.h | 28 ++++++++++++- net/tipc/node.c | 6 ++- net/tipc/node.h | 6 ++- 5 files changed, 199 insertions(+), 19 deletions(-) -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-24 01:56:37
|
In conjunction with changing the interfaces' MTU (e.g. especially in the case of a bonding) where the TIPC links are brought up and down in a short time, a couple of issues were detected with the current link changeover mechanism: 1) When one link is up but immediately forced down again, the failover procedure will be carried out in order to failover all the messages in the link's transmq queue onto the other working link. The link and node state is also set to FAILINGOVER as part of the process. The message will be transmited in form of a FAILOVER_MSG, so its size is plus of 40 bytes (= the message header size). There is no problem if the original message size is not larger than the link's MTU - 40, and indeed this is the max size of a normal payload messages. However, in the situation above, because the link has just been up, the messages in the link's transmq are almost SYNCH_MSGs which had been generated by the link synching procedure, then their size might reach the max value already! When the FAILOVER_MSG is built on the top of such a SYNCH_MSG, its size will exceed the link's MTU. As a result, the messages are dropped silently and the failover procedure will never end up, the link will not be able to exit the FAILINGOVER state, so cannot be re-established. 2) The same scenario above can happen more easily in case the MTU of the links is set differently or when changing. In that case, as long as a large message in the failure link's transmq queue was built and fragmented with its link's MTU > the other link's one, the issue will happen (there is no need of a link synching in advance). 3) The link synching procedure also faces with the same issue but since the link synching is only started upon receipt of a SYNCH_MSG, dropping the message will not result in a state deadlock, but it is not expected as design. The 1) & 3) issues are resolved by the last commit that only a dummy SYNCH_MSG (i.e. without data) is generated at the link synching, so the size of a FAILOVER_MSG if any then will never exceed the link's MTU. For the 2) issue, the only solution is trying to fragment the messages in the failure link's transmq queue according to the working link's MTU so they can be failovered then. A new function is made to accomplish this, it will still be a TUNNEL PROTOCOL/FAILOVER MSG but if the original message size is too large, it will be fragmented & reassembled at the receiving side. Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 93 ++++++++++++++++++++++++++++++++++++++++++++++++--------- net/tipc/msg.c | 59 ++++++++++++++++++++++++++++++++++++ net/tipc/msg.h | 18 ++++++++++- 3 files changed, 155 insertions(+), 15 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index e215b4ba6a4b..2c274777b2dd 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -180,6 +180,7 @@ struct tipc_link { /* Fragmentation/reassembly */ struct sk_buff *reasm_buf; + struct sk_buff *reasm_tnlmsg; /* Broadcast */ u16 ackers; @@ -897,8 +898,10 @@ void tipc_link_reset(struct tipc_link *l) l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); + kfree_skb(l->reasm_tnlmsg); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; + l->reasm_tnlmsg = NULL; l->failover_reasm_skb = NULL; l->rcv_unacked = 0; l->snd_nxt = 1; @@ -940,6 +943,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, int rc = 0; if (unlikely(msg_size(hdr) > mtu)) { + pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n", + skb_queue_len(list), msg_user(hdr), + msg_type(hdr), msg_size(hdr), mtu); skb_queue_purge(list); return -EMSGSIZE; } @@ -1233,6 +1239,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { struct sk_buff **reasm_skb = &l->failover_reasm_skb; + struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg; struct sk_buff_head *fdefq = &l->failover_deferdq; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff *iskb; @@ -1240,40 +1247,56 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, int rc = 0; u16 seqno; - /* SYNCH_MSG */ - if (msg_type(hdr) == SYNCH_MSG) - goto drop; + if (msg_type(hdr) == SYNCH_MSG) { + kfree_skb(skb); + return 0; + } - /* FAILOVER_MSG */ - if (!tipc_msg_extract(skb, &iskb, &ipos)) { - pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n", - skb_queue_len(fdefq)); - return rc; + /* Not a fragment? */ + if (likely(!msg_nof_fragms(hdr))) { + if (unlikely(!tipc_msg_extract(skb, &iskb, &ipos))) { + pr_warn_ratelimited("Unable to extract msg, defq: %d\n", + skb_queue_len(fdefq)); + return 0; + } + kfree_skb(skb); + } else { + /* Set fragment type for buf_append */ + if (msg_fragm_no(hdr) == 1) + msg_set_type(hdr, FIRST_FRAGMENT); + else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr)) + msg_set_type(hdr, FRAGMENT); + else + msg_set_type(hdr, LAST_FRAGMENT); + + if (!tipc_buf_append(reasm_tnlmsg, &skb)) { + /* Successful but non-complete reassembly? */ + if (*reasm_tnlmsg || link_is_bc_rcvlink(l)) + return 0; + pr_warn_ratelimited("Unable to reassemble tunnel msg\n"); + return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + } + iskb = skb; } do { seqno = buf_seqno(iskb); - if (unlikely(less(seqno, l->drop_point))) { kfree_skb(iskb); continue; } - if (unlikely(seqno != l->drop_point)) { __tipc_skb_queue_sorted(fdefq, seqno, iskb); continue; } l->drop_point++; - if (!tipc_data_input(l, iskb, inputq)) rc |= tipc_link_input(l, iskb, inputq, reasm_skb); if (unlikely(rc)) break; } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point))); -drop: - kfree_skb(skb); return rc; } @@ -1663,15 +1686,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff *skb, *tnlskb; struct tipc_msg *hdr, tnlhdr; struct sk_buff_head *queue = &l->transmq; - struct sk_buff_head tmpxq, tnlq; + struct sk_buff_head tmpxq, tnlq, frags; u16 pktlen, pktcnt, seqno = l->snd_nxt; + bool pktcnt_need_update = false; u16 syncpt; + int rc; if (!tnl) return; skb_queue_head_init(&tnlq); skb_queue_head_init(&tmpxq); + skb_queue_head_init(&frags); /* At least one packet required for safe algorithm => add dummy */ skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, @@ -1727,6 +1753,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, if (queue == &l->backlogq) msg_set_seqno(hdr, seqno++); pktlen = msg_size(hdr); + + /* Tunnel link MTU is not large enough? This could be + * due to: + * 1) Link MTU has just changed or set differently; + * 2) Or FAILOVER on the top of a SYNCH message + * + * The 2nd case should not happen if peer supports + * TIPC_TUNNEL_ENHANCED + */ + if (pktlen > tnl->mtu - INT_H_SIZE) { + if (mtyp == FAILOVER_MSG && + (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu, + &frags); + if (rc) { + pr_warn("%sunable to frag msg: rc %d\n", + link_co_err, rc); + return; + } + pktcnt += skb_queue_len(&frags) - 1; + pktcnt_need_update = true; + skb_queue_splice_tail_init(&frags, &tnlq); + continue; + } + /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED + * => Just warn it and return! + */ + pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n", + link_co_err, msg_user(hdr), + msg_type(hdr), msg_size(hdr)); + return; + } + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); if (!tnlskb) { @@ -1742,6 +1801,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, goto tnl; } + if (pktcnt_need_update) + skb_queue_walk(&tnlq, skb) { + hdr = buf_msg(skb); + msg_set_msgcnt(hdr, pktcnt); + } + tipc_link_xmit(tnl, &tnlq, xmitq); if (mtyp == FAILOVER_MSG) { diff --git a/net/tipc/msg.c b/net/tipc/msg.c index f48e5857210f..e6d49cdc61b4 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -244,6 +244,65 @@ bool tipc_msg_validate(struct sk_buff **_skb) } /** + * tipc_msg_fragment - build a fragment skb list for TIPC message + * + * @skb: TIPC message skb + * @hdr: internal msg header to be put on the top of the fragments + * @pktmax: max size of a fragment incl. the header + * @frags: returned fragment skb list + * + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL + * or -ENOMEM + */ +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags) +{ + int pktno, nof_fragms, dsz, dmax, eat; + struct tipc_msg *_hdr; + struct sk_buff *_skb; + u8 *data; + + /* Non-linear buffer? */ + if (skb_linearize(skb)) + return -ENOMEM; + + data = (u8 *)skb->data; + dsz = msg_size(buf_msg(skb)); + dmax = pktmax - INT_H_SIZE; + if (dsz <= dmax || !dmax) + return -EINVAL; + + nof_fragms = dsz / dmax + 1; + for (pktno = 1; pktno <= nof_fragms; pktno++) { + if (pktno < nof_fragms) + eat = dmax; + else + eat = dsz % dmax; + /* Allocate a new fragment */ + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); + if (!_skb) + goto error; + skb_orphan(_skb); + __skb_queue_tail(frags, _skb); + /* Copy header & data to the fragment */ + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); + data += eat; + /* Update the fragment's header */ + _hdr = buf_msg(_skb); + msg_set_fragm_no(_hdr, pktno); + msg_set_nof_fragms(_hdr, nof_fragms); + msg_set_size(_hdr, INT_H_SIZE + eat); + } + return 0; + +error: + __skb_queue_purge(frags); + __skb_queue_head_init(frags); + return -ENOMEM; +} + +/** * tipc_msg_build - create buffer chain containing specified header and data * @mhdr: Message header, to be prepended to data * @m: User message diff --git a/net/tipc/msg.h b/net/tipc/msg.h index fca042cdff88..1c8c8dd32a4e 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) msg_set_bits(m, 4, 16, 0xffff, n); } +static inline u32 msg_nof_fragms(struct tipc_msg *m) +{ + return msg_bits(m, 4, 0, 0xffff); +} + +static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 4, 0, 0xffff, n); +} + +static inline u32 msg_fragm_no(struct tipc_msg *m) +{ + return msg_bits(m, 4, 16, 0xffff); +} + static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) { msg_set_bits(m, 4, 16, 0xffff, n); } - static inline u16 msg_next_sent(struct tipc_msg *m) { return msg_bits(m, 4, 0, 0xffff); @@ -1045,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu); bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, u32 mtu, u32 dnode); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); -- 2.13.7 |
From: Tuong L. <tuo...@de...> - 2019-07-24 01:56:38
|
This commit along with the next one are to resolve the issues with the link changeover mechanism. See that commit for details. Basically, for the link synching, from now on, we will send only one single ("dummy") SYNCH message to peer. The SYNCH message does not contain any data, just a header conveying the synch point to the peer. A new node capability flag ("TIPC_TUNNEL_ENHANCED") is introduced for backward compatible! Acked-by: Ying Xue <yin...@wi...> Acked-by: Jon Maloy <jon...@er...> Suggested-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 26 ++++++++++++++++++++++++++ net/tipc/msg.h | 10 ++++++++++ net/tipc/node.c | 6 ++++-- net/tipc/node.h | 6 ++++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 66d3a07bc571..e215b4ba6a4b 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1665,6 +1665,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff_head *queue = &l->transmq; struct sk_buff_head tmpxq, tnlq; u16 pktlen, pktcnt, seqno = l->snd_nxt; + u16 syncpt; if (!tnl) return; @@ -1684,6 +1685,31 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, tipc_link_xmit(l, &tnlq, &tmpxq); __skb_queue_purge(&tmpxq); + /* Link Synching: + * From now on, send only one single ("dummy") SYNCH message + * to peer. The SYNCH message does not contain any data, just + * a header conveying the synch point to the peer. + */ + if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, + INT_H_SIZE, 0, l->addr, + tipc_own_addr(l->net), + 0, 0, 0); + if (!tnlskb) { + pr_warn("%sunable to create dummy SYNCH_MSG\n", + link_co_err); + return; + } + + hdr = buf_msg(tnlskb); + syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1; + msg_set_syncpt(hdr, syncpt); + msg_set_bearer_id(hdr, l->peer_bearer_id); + __skb_queue_tail(&tnlq, tnlskb); + tipc_link_xmit(tnl, &tnlq, xmitq); + return; + } + /* Initialize reusable tunnel packet header */ tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, mtyp, INT_H_SIZE, l->addr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index da509f0eb9ca..fca042cdff88 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -877,6 +877,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + static inline u32 msg_conn_ack(struct tipc_msg *m) { return msg_bits(m, 9, 16, 0xffff); diff --git a/net/tipc/node.c b/net/tipc/node.c index 324a1f91b394..5d8b48051bb9 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1649,7 +1649,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, int usr = msg_user(hdr); int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - u16 iseqno = msg_seqno(msg_inner_hdr(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; int state = n->state; @@ -1748,7 +1747,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate synch mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) { - syncpt = iseqno + exp_pkts - 1; + if (n->capabilities & TIPC_TUNNEL_ENHANCED) + syncpt = msg_syncpt(hdr); + else + syncpt = msg_seqno(msg_inner_hdr(hdr)) + exp_pkts - 1; if (!tipc_link_is_up(l)) __tipc_node_link_up(n, bearer_id, xmitq); if (n->state == SELF_UP_PEER_UP) { diff --git a/net/tipc/node.h b/net/tipc/node.h index c0bf49ea3de4..291d0ecd4101 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -53,7 +53,8 @@ enum { TIPC_NODE_ID128 = (1 << 5), TIPC_LINK_PROTO_SEQNO = (1 << 6), TIPC_MCAST_RBCTL = (1 << 7), - TIPC_GAP_ACK_BLOCK = (1 << 8) + TIPC_GAP_ACK_BLOCK = (1 << 8), + TIPC_TUNNEL_ENHANCED = (1 << 9) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -64,7 +65,8 @@ enum { TIPC_NODE_ID128 | \ TIPC_LINK_PROTO_SEQNO | \ TIPC_MCAST_RBCTL | \ - TIPC_GAP_ACK_BLOCK) + TIPC_GAP_ACK_BLOCK | \ + TIPC_TUNNEL_ENHANCED) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); -- 2.13.7 |
From: David M. <da...@da...> - 2019-07-25 22:56:13
|
From: Tuong Lien <tuo...@de...> Date: Wed, 24 Jul 2019 08:56:10 +0700 > This patch series is to resolve some issues found with the current link > changeover mechanism, it also includes an optimization for the link > synching. Series applied, thank you. |
From: Tuong L. <tuo...@de...> - 2019-06-04 05:23:01
|
This commit is along with the latter commit 4ec6a14c3933 ("tipc: fix changeover issues due to large packet") to resolve the issues with the link changeover mechanism. See that commit for details. Basically, for the link synching, from now on, we will send only one single ("dummy") SYNCH message to peer. The SYNCH message does not contain any data, just a header conveying the synch point to the peer. A new node capability flag ("TIPC_TUNNEL_ENHANCED") is introduced for backward compatible! Suggested-by: Jon Maloy <jon...@er...> Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 26 ++++++++++++++++++++++++++ net/tipc/msg.h | 10 ++++++++++ net/tipc/node.c | 6 ++++-- net/tipc/node.h | 6 ++++-- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index f5cd986e1e50..6924cf1e526f 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1637,6 +1637,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff_head *queue = &l->transmq; struct sk_buff_head tmpxq, tnlq; u16 pktlen, pktcnt, seqno = l->snd_nxt; + u16 syncpt; if (!tnl) return; @@ -1656,6 +1657,31 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, tipc_link_xmit(l, &tnlq, &tmpxq); __skb_queue_purge(&tmpxq); + /* Link Synching: + * From now on, send only one single ("dummy") SYNCH message + * to peer. The SYNCH message does not contain any data, just + * a header conveying the synch point to the peer. + */ + if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, + INT_H_SIZE, 0, l->addr, + tipc_own_addr(l->net), + 0, 0, 0); + if (!tnlskb) { + pr_warn("%sunable to create dummy SYNCH_MSG\n", + link_co_err); + return; + } + + hdr = buf_msg(tnlskb); + syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1; + msg_set_syncpt(hdr, syncpt); + msg_set_bearer_id(hdr, l->peer_bearer_id); + __skb_queue_tail(&tnlq, tnlskb); + tipc_link_xmit(tnl, &tnlq, xmitq); + return; + } + /* Initialize reusable tunnel packet header */ tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, mtyp, INT_H_SIZE, l->addr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 8de02ad6e352..baf937bfa702 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -877,6 +877,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) msg_set_bits(m, 9, 16, 0xffff, n); } +static inline u16 msg_syncpt(struct tipc_msg *m) +{ + return msg_bits(m, 9, 16, 0xffff); +} + +static inline void msg_set_syncpt(struct tipc_msg *m, u16 n) +{ + msg_set_bits(m, 9, 16, 0xffff, n); +} + static inline u32 msg_conn_ack(struct tipc_msg *m) { return msg_bits(m, 9, 16, 0xffff); diff --git a/net/tipc/node.c b/net/tipc/node.c index 9e106d3ed187..2a8399cf5525 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1645,7 +1645,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, int usr = msg_user(hdr); int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); - u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; int state = n->state; @@ -1744,7 +1743,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, /* Initiate synch mode if applicable */ if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) { - syncpt = iseqno + exp_pkts - 1; + if (n->capabilities & TIPC_TUNNEL_ENHANCED) + syncpt = msg_syncpt(hdr); + else + syncpt = msg_seqno(msg_get_wrapped(hdr)) + exp_pkts - 1; if (!tipc_link_is_up(l)) __tipc_node_link_up(n, bearer_id, xmitq); if (n->state == SELF_UP_PEER_UP) { diff --git a/net/tipc/node.h b/net/tipc/node.h index c0bf49ea3de4..291d0ecd4101 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -53,7 +53,8 @@ enum { TIPC_NODE_ID128 = (1 << 5), TIPC_LINK_PROTO_SEQNO = (1 << 6), TIPC_MCAST_RBCTL = (1 << 7), - TIPC_GAP_ACK_BLOCK = (1 << 8) + TIPC_GAP_ACK_BLOCK = (1 << 8), + TIPC_TUNNEL_ENHANCED = (1 << 9) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -64,7 +65,8 @@ enum { TIPC_NODE_ID128 | \ TIPC_LINK_PROTO_SEQNO | \ TIPC_MCAST_RBCTL | \ - TIPC_GAP_ACK_BLOCK) + TIPC_GAP_ACK_BLOCK | \ + TIPC_TUNNEL_ENHANCED) #define INVALID_BEARER_ID -1 void tipc_node_stop(struct net *net); -- 2.13.7 |
From: Ying X. <yin...@wi...> - 2019-06-16 06:47:24
|
On 6/4/19 1:22 PM, Tuong Lien wrote: > This commit is along with the latter commit 4ec6a14c3933 The commit ID might be changed after the commit is merged into upstream, which means the ID will be invalid as well. ("tipc: fix > changeover issues due to large packet") to resolve the issues with the > link changeover mechanism. See that commit for details. > > Basically, for the link synching, from now on, we will send only one > single ("dummy") SYNCH message to peer. The SYNCH message does not > contain any data, just a header conveying the synch point to the peer. > > A new node capability flag ("TIPC_TUNNEL_ENHANCED") is introduced for > backward compatible! > > Suggested-by: Jon Maloy <jon...@er...> > Signed-off-by: Tuong Lien <tuo...@de...> Acked-by: Ying Xue <yin...@wi...> > --- > net/tipc/link.c | 26 ++++++++++++++++++++++++++ > net/tipc/msg.h | 10 ++++++++++ > net/tipc/node.c | 6 ++++-- > net/tipc/node.h | 6 ++++-- > 4 files changed, 44 insertions(+), 4 deletions(-) > > diff --git a/net/tipc/link.c b/net/tipc/link.c > index f5cd986e1e50..6924cf1e526f 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -1637,6 +1637,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, > struct sk_buff_head *queue = &l->transmq; > struct sk_buff_head tmpxq, tnlq; > u16 pktlen, pktcnt, seqno = l->snd_nxt; > + u16 syncpt; > > if (!tnl) > return; > @@ -1656,6 +1657,31 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, > tipc_link_xmit(l, &tnlq, &tmpxq); > __skb_queue_purge(&tmpxq); > > + /* Link Synching: > + * From now on, send only one single ("dummy") SYNCH message > + * to peer. The SYNCH message does not contain any data, just > + * a header conveying the synch point to the peer. > + */ > + if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { > + tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, > + INT_H_SIZE, 0, l->addr, > + tipc_own_addr(l->net), > + 0, 0, 0); > + if (!tnlskb) { > + pr_warn("%sunable to create dummy SYNCH_MSG\n", > + link_co_err); > + return; > + } > + > + hdr = buf_msg(tnlskb); > + syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1; > + msg_set_syncpt(hdr, syncpt); > + msg_set_bearer_id(hdr, l->peer_bearer_id); > + __skb_queue_tail(&tnlq, tnlskb); > + tipc_link_xmit(tnl, &tnlq, xmitq); > + return; > + } > + > /* Initialize reusable tunnel packet header */ > tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, > mtyp, INT_H_SIZE, l->addr); > diff --git a/net/tipc/msg.h b/net/tipc/msg.h > index 8de02ad6e352..baf937bfa702 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -877,6 +877,16 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n) > msg_set_bits(m, 9, 16, 0xffff, n); > } > > +static inline u16 msg_syncpt(struct tipc_msg *m) > +{ > + return msg_bits(m, 9, 16, 0xffff); > +} > + > +static inline void msg_set_syncpt(struct tipc_msg *m, u16 n) > +{ > + msg_set_bits(m, 9, 16, 0xffff, n); > +} > + > static inline u32 msg_conn_ack(struct tipc_msg *m) > { > return msg_bits(m, 9, 16, 0xffff); > diff --git a/net/tipc/node.c b/net/tipc/node.c > index 9e106d3ed187..2a8399cf5525 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -1645,7 +1645,6 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, > int usr = msg_user(hdr); > int mtyp = msg_type(hdr); > u16 oseqno = msg_seqno(hdr); > - u16 iseqno = msg_seqno(msg_get_wrapped(hdr)); > u16 exp_pkts = msg_msgcnt(hdr); > u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; > int state = n->state; > @@ -1744,7 +1743,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, > > /* Initiate synch mode if applicable */ > if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) { > - syncpt = iseqno + exp_pkts - 1; > + if (n->capabilities & TIPC_TUNNEL_ENHANCED) > + syncpt = msg_syncpt(hdr); > + else > + syncpt = msg_seqno(msg_get_wrapped(hdr)) + exp_pkts - 1; > if (!tipc_link_is_up(l)) > __tipc_node_link_up(n, bearer_id, xmitq); > if (n->state == SELF_UP_PEER_UP) { > diff --git a/net/tipc/node.h b/net/tipc/node.h > index c0bf49ea3de4..291d0ecd4101 100644 > --- a/net/tipc/node.h > +++ b/net/tipc/node.h > @@ -53,7 +53,8 @@ enum { > TIPC_NODE_ID128 = (1 << 5), > TIPC_LINK_PROTO_SEQNO = (1 << 6), > TIPC_MCAST_RBCTL = (1 << 7), > - TIPC_GAP_ACK_BLOCK = (1 << 8) > + TIPC_GAP_ACK_BLOCK = (1 << 8), > + TIPC_TUNNEL_ENHANCED = (1 << 9) > }; > > #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ > @@ -64,7 +65,8 @@ enum { > TIPC_NODE_ID128 | \ > TIPC_LINK_PROTO_SEQNO | \ > TIPC_MCAST_RBCTL | \ > - TIPC_GAP_ACK_BLOCK) > + TIPC_GAP_ACK_BLOCK | \ > + TIPC_TUNNEL_ENHANCED) > #define INVALID_BEARER_ID -1 > > void tipc_node_stop(struct net *net); > |
From: Tuong L. <tuo...@de...> - 2019-06-04 05:23:04
|
In conjunction with changing the interfaces' MTU (e.g. especially in the case of a bonding) where the TIPC links are brought up and down in a short time, a couple of issues were detected with the current link changeover mechanism: 1) When one link is up but immediately forced down again, the failover procedure will be carried out in order to failover all the messages in the link's transmq queue onto the other working link. The link and node state is also set to FAILINGOVER as part of the process. The message will be transmited in form of a FAILOVER_MSG, so its size is plus of 40 bytes (= the message header size). There is no problem if the original message size is not larger than the link's MTU - 40, and indeed this is the max size of a normal payload messages. However, in the situation above, because the link has just been up, the messages in the link's transmq are almost SYNCH_MSGs which had been generated by the link synching procedure, then their size might reach the max value already! When the FAILOVER_MSG is built on the top of such a SYNCH_MSG, its size will exceed the link's MTU. As a result, the messages are dropped silently and the failover procedure will never end up, the link will not be able to exit the FAILINGOVER state, so cannot be re-established. 2) The same scenario above can happen more easily in case the MTU of the links is set differently or when changing. In that case, as long as a large message in the failure link's transmq queue was built and fragmented with its link's MTU > the other link's one, the issue will happen (there is no need of a link synching in advance). 3) The link synching procedure also faces with the same issue but since the link synching is only started upon receipt of a SYNCH_MSG, dropping the message will not result in a state deadlock, but it is not expected as design. The 1) & 3) issues are resolved by the previous commit 81e4dd94b214 ("tipc: optimize link synching mechanism") by generating only a dummy SYNCH_MSG (i.e. without data) at the link synching, so the size of a FAILOVER_MSG if any then will never exceed the link's MTU. For the 2) issue, the only solution is trying to fragment the messages in the failure link's transmq queue according to the working link's MTU so they can be failovered then. A new function is made to accomplish this, it will still be a TUNNEL PROTOCOL/FAILOVER MSG but if the original message size is too large, it will be fragmented & reassembled at the receiving side. Signed-off-by: Tuong Lien <tuo...@de...> --- net/tipc/link.c | 92 +++++++++++++++++++++++++++++++++++++++++++++++++-------- net/tipc/msg.c | 62 ++++++++++++++++++++++++++++++++++++++ net/tipc/msg.h | 18 ++++++++++- 3 files changed, 158 insertions(+), 14 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 6924cf1e526f..3d2d2e4e4f14 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -182,6 +182,7 @@ struct tipc_link { /* Fragmentation/reassembly */ struct sk_buff *reasm_buf; + struct sk_buff *reasm_tnlmsg; /* Broadcast */ u16 ackers; @@ -899,8 +900,10 @@ void tipc_link_reset(struct tipc_link *l) l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0; l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0; kfree_skb(l->reasm_buf); + kfree_skb(l->reasm_tnlmsg); kfree_skb(l->failover_reasm_skb); l->reasm_buf = NULL; + l->reasm_tnlmsg = NULL; l->failover_reasm_skb = NULL; l->rcv_unacked = 0; l->snd_nxt = 1; @@ -943,6 +946,9 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, int rc = 0; if (unlikely(msg_size(hdr) > mtu)) { + pr_warn("Purging list (len = %d), head msg <%d, %d>: %d\n", + skb_queue_len(list), msg_user(hdr), + msg_type(hdr), msg_size(hdr)); skb_queue_purge(list); return -EMSGSIZE; } @@ -1212,6 +1218,7 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *inputq) { struct sk_buff **reasm_skb = &l->failover_reasm_skb; + struct sk_buff **reasm_tnlmsg = &l->reasm_tnlmsg; struct sk_buff_head *fdefq = &l->failover_deferdq; struct tipc_msg *hdr = buf_msg(skb); struct sk_buff *iskb; @@ -1219,25 +1226,44 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, int rc = 0; u16 seqno; - /* SYNCH_MSG */ - if (msg_type(hdr) == SYNCH_MSG) - goto drop; + if (msg_type(hdr) == SYNCH_MSG) { + kfree_skb(skb); + return 0; + } - /* FAILOVER_MSG */ - if (!tipc_msg_extract(skb, &iskb, &ipos)) { - pr_warn_ratelimited("Cannot extract FAILOVER_MSG, defq: %d\n", - skb_queue_len(fdefq)); - return rc; + /* No fragmentation? */ + if (likely(!msg_nof_fragms(hdr))) { + if (!tipc_msg_extract(skb, &iskb, &ipos)) { + pr_warn_ratelimited("Unable to extract msg, defq: %d\n", + skb_queue_len(fdefq)); + return 0; + } + kfree_skb(skb); + } else { + /* Set fragment type for buf_append */ + if (msg_fragm_no(hdr) == 1) + msg_set_type(hdr, FIRST_FRAGMENT); + else if (msg_fragm_no(hdr) < msg_nof_fragms(hdr)) + msg_set_type(hdr, FRAGMENT); + else + msg_set_type(hdr, LAST_FRAGMENT); + + if (!tipc_buf_append(reasm_tnlmsg, &skb)) { + if (*reasm_tnlmsg || link_is_bc_rcvlink(l)) + return 0; + + pr_warn_ratelimited("Unable to reassemble tunnel msg\n"); + return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); + } + iskb = skb; } do { seqno = buf_seqno(iskb); - if (unlikely(less(seqno, l->drop_point))) { kfree_skb(iskb); continue; } - if (unlikely(seqno != l->drop_point)) { __tipc_skb_queue_sorted(fdefq, seqno, iskb); continue; @@ -1251,8 +1277,6 @@ static int tipc_link_tnl_rcv(struct tipc_link *l, struct sk_buff *skb, break; } while ((iskb = __tipc_skb_dequeue(fdefq, l->drop_point))); -drop: - kfree_skb(skb); return rc; } @@ -1635,15 +1659,18 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, struct sk_buff *skb, *tnlskb; struct tipc_msg *hdr, tnlhdr; struct sk_buff_head *queue = &l->transmq; - struct sk_buff_head tmpxq, tnlq; + struct sk_buff_head tmpxq, tnlq, frags; u16 pktlen, pktcnt, seqno = l->snd_nxt; + bool pktcnt_need_update = false; u16 syncpt; + int rc; if (!tnl) return; skb_queue_head_init(&tnlq); skb_queue_head_init(&tmpxq); + skb_queue_head_init(&frags); /* At least one packet required for safe algorithm => add dummy */ skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, @@ -1699,6 +1726,39 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, if (queue == &l->backlogq) msg_set_seqno(hdr, seqno++); pktlen = msg_size(hdr); + + /* Tunnel link MTU is not large enough? This could be + * due to: + * 1) Link MTU has just changed or set differently; + * 2) Or FAILOVER on the top of a SYNCH message + * + * The 2nd case should not happen if peer supports + * TIPC_TUNNEL_ENHANCED + */ + if (pktlen > tnl->mtu - INT_H_SIZE) { + if (mtyp == FAILOVER_MSG && + (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { + rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu, + &frags); + if (rc) { + pr_warn("%sunable to frag msg: rc %d\n", + link_co_err, rc); + return; + } + pktcnt += skb_queue_len(&frags) - 1; + pktcnt_need_update = true; + skb_queue_splice_tail_init(&frags, &tnlq); + continue; + } + /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED + * => Just warn it and return? + */ + pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n", + link_co_err, msg_user(hdr), + msg_type(hdr), msg_size(hdr)); + return; + } + msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); if (!tnlskb) { @@ -1714,6 +1774,12 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, goto tnl; } + if (pktcnt_need_update) + skb_queue_walk(&tnlq, skb) { + hdr = buf_msg(skb); + msg_set_msgcnt(hdr, pktcnt); + } + tipc_link_xmit(tnl, &tnlq, xmitq); if (mtyp == FAILOVER_MSG) { diff --git a/net/tipc/msg.c b/net/tipc/msg.c index f48e5857210f..dfa6eeb40013 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -244,6 +244,68 @@ bool tipc_msg_validate(struct sk_buff **_skb) } /** + * tipc_msg_fragment - build a fragment skb list for TIPC message + * + * @skb: TIPC message skb + * @hdr: internal msg header to be put on the top of the fragments + * @pktmax: max size of a fragment incl. the header + * @frags: returned fragment skb list + * + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL + * or -ENOMEM + */ +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags) +{ + int pktno, nof_fragms, dsz, dmax, eat; + struct tipc_msg *_hdr; + struct sk_buff *_skb; + u8 *data; + + /* Non-linear buffer? */ + if (skb_linearize(skb)) + return -ENOMEM; + + data = (u8 *)skb->data; + dsz = msg_size(buf_msg(skb)); + dmax = pktmax - INT_H_SIZE; + + if (dsz <= dmax || !dmax) + return -EINVAL; + + nof_fragms = dsz / dmax + 1; + + for (pktno = 1; pktno <= nof_fragms; pktno++) { + if (pktno < nof_fragms) + eat = dmax; + else + eat = dsz % dmax; + + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); + if (!_skb) + goto error; + + skb_orphan(_skb); + __skb_queue_tail(frags, _skb); + + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); + data += eat; + + _hdr = buf_msg(_skb); + msg_set_fragm_no(_hdr, pktno); + msg_set_nof_fragms(_hdr, nof_fragms); + msg_set_size(_hdr, INT_H_SIZE + eat); + } + return 0; + +error: + __skb_queue_purge(frags); + __skb_queue_head_init(frags); + return -ENOMEM; +} + +/** * tipc_msg_build - create buffer chain containing specified header and data * @mhdr: Message header, to be prepended to data * @m: User message diff --git a/net/tipc/msg.h b/net/tipc/msg.h index baf937bfa702..fbfdf7843d07 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -721,12 +721,26 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) msg_set_bits(m, 4, 16, 0xffff, n); } +static inline u32 msg_nof_fragms(struct tipc_msg *m) +{ + return msg_bits(m, 4, 0, 0xffff); +} + +static inline void msg_set_nof_fragms(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 4, 0, 0xffff, n); +} + +static inline u32 msg_fragm_no(struct tipc_msg *m) +{ + return msg_bits(m, 4, 16, 0xffff); +} + static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) { msg_set_bits(m, 4, 16, 0xffff, n); } - static inline u16 msg_next_sent(struct tipc_msg *m) { return msg_bits(m, 4, 0, 0xffff); @@ -1045,6 +1059,8 @@ bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu); bool tipc_msg_make_bundle(struct sk_buff **skb, struct tipc_msg *msg, u32 mtu, u32 dnode); bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, + int pktmax, struct sk_buff_head *frags); int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, int dsz, int mtu, struct sk_buff_head *list); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); -- 2.13.7 |
From: Jon M. <jon...@er...> - 2019-06-05 17:27:56
|
Both patches acked by me. ///jon > -----Original Message----- > From: Tuong Lien <tuo...@de...> > Sent: 4-Jun-19 01:23 > To: tip...@li...; Jon Maloy > <jon...@er...>; ma...@do...; yin...@wi... > Subject: [PATCH RFC 0/2] tipc: link changeover issues > > This patch series is to resolve some issues found with the current link > changeover mechanism, it also includes an optimization for the link synching. > > Tuong Lien (2): > tipc: optimize link synching mechanism > tipc: fix changeover issues due to large packet > > net/tipc/link.c | 118 > +++++++++++++++++++++++++++++++++++++++++++++++++------- > net/tipc/msg.c | 62 +++++++++++++++++++++++++++++ net/tipc/msg.h | > 28 +++++++++++++- > net/tipc/node.c | 6 ++- > net/tipc/node.h | 6 ++- > 5 files changed, 202 insertions(+), 18 deletions(-) > > -- > 2.13.7 |
From: Ying X. <yin...@wi...> - 2019-06-16 06:53:45
|
> 2) The same scenario above can happen more easily in case the MTU of > the links is set differently or when changing. In that case, as long as > a large message in the failure link's transmq queue was built and > fragmented with its link's MTU > the other link's one, the issue will > happen (there is no need of a link synching in advance). > > 3) The link synching procedure also faces with the same issue but since > the link synching is only started upon receipt of a SYNCH_MSG, dropping > the message will not result in a state deadlock, but it is not expected > as design. > > The 1) & 3) issues are resolved by the previous commit 81e4dd94b214 This is the same as previous commit. The commit ID might be invalid after it's merged into upstream. > ("tipc: optimize link synching mechanism") by generating only a dummy > SYNCH_MSG (i.e. without data) at the link synching, so the size of a > FAILOVER_MSG if any then will never exceed the link's MTU. > /** > + * tipc_msg_fragment - build a fragment skb list for TIPC message > + * > + * @skb: TIPC message skb > + * @hdr: internal msg header to be put on the top of the fragments > + * @pktmax: max size of a fragment incl. the header > + * @frags: returned fragment skb list > + * > + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL > + * or -ENOMEM > + */ > +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, > + int pktmax, struct sk_buff_head *frags) > +{ > + int pktno, nof_fragms, dsz, dmax, eat; > + struct tipc_msg *_hdr; > + struct sk_buff *_skb; > + u8 *data; > + > + /* Non-linear buffer? */ > + if (skb_linearize(skb)) > + return -ENOMEM; > + > + data = (u8 *)skb->data; > + dsz = msg_size(buf_msg(skb)); > + dmax = pktmax - INT_H_SIZE; > + > + if (dsz <= dmax || !dmax) > + return -EINVAL; > + > + nof_fragms = dsz / dmax + 1; > + > + for (pktno = 1; pktno <= nof_fragms; pktno++) { > + if (pktno < nof_fragms) > + eat = dmax; > + else > + eat = dsz % dmax; > + > + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); > + if (!_skb) > + goto error; > + > + skb_orphan(_skb); > + __skb_queue_tail(frags, _skb); > + > + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); > + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); > + data += eat; > + > + _hdr = buf_msg(_skb); > + msg_set_fragm_no(_hdr, pktno); > + msg_set_nof_fragms(_hdr, nof_fragms); > + msg_set_size(_hdr, INT_H_SIZE + eat); > + } > + return 0; > + In fact we have similar code in tipc_msg_build() where we also fragment packet if necessary. In order to eliminate redundant code, I suggest we should extract the common code into a separate function and then tipc_msg_build() and tipc_msg_fragment() call it. |
From: Tuong L. T. <tuo...@de...> - 2019-06-17 08:49:29
|
Hi Ying, Thanks for your comments! Regarding your last statement, yes when making the patch, I noticed that the "tipc_msg_build()" and "tipc_msg_fragment()" do a similar task, also I tried to think a way to combine them but didn't because of the reasons: 1- The "core" functions to copy the data are different since the "tipc_msg_build()" plays with user data in the iov buffers, whereas, for the other, it's skb data. Also, the outputs are different, the first function will set the messages' type in header such as "FIRST_FRAGMENT", "FRAGMENT" or "LAST_FRAGMENT", but not with the other because it will overwrite the tunnel messages' type... that I had to use the other field (fragm_no/nof_fragms) to determine this at the receiving side... 2- I don't want to touch the old code that can be risky :( BR/Tuong -----Original Message----- From: Ying Xue <yin...@wi...> Sent: Sunday, June 16, 2019 1:42 PM To: Tuong Lien <tuo...@de...>; tip...@li...; jon...@er...; ma...@do... Subject: Re: [PATCH RFC 2/2] tipc: fix changeover issues due to large packet > 2) The same scenario above can happen more easily in case the MTU of > the links is set differently or when changing. In that case, as long as > a large message in the failure link's transmq queue was built and > fragmented with its link's MTU > the other link's one, the issue will > happen (there is no need of a link synching in advance). > > 3) The link synching procedure also faces with the same issue but since > the link synching is only started upon receipt of a SYNCH_MSG, dropping > the message will not result in a state deadlock, but it is not expected > as design. > > The 1) & 3) issues are resolved by the previous commit 81e4dd94b214 This is the same as previous commit. The commit ID might be invalid after it's merged into upstream. > ("tipc: optimize link synching mechanism") by generating only a dummy > SYNCH_MSG (i.e. without data) at the link synching, so the size of a > FAILOVER_MSG if any then will never exceed the link's MTU. > /** > + * tipc_msg_fragment - build a fragment skb list for TIPC message > + * > + * @skb: TIPC message skb > + * @hdr: internal msg header to be put on the top of the fragments > + * @pktmax: max size of a fragment incl. the header > + * @frags: returned fragment skb list > + * > + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL > + * or -ENOMEM > + */ > +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, > + int pktmax, struct sk_buff_head *frags) > +{ > + int pktno, nof_fragms, dsz, dmax, eat; > + struct tipc_msg *_hdr; > + struct sk_buff *_skb; > + u8 *data; > + > + /* Non-linear buffer? */ > + if (skb_linearize(skb)) > + return -ENOMEM; > + > + data = (u8 *)skb->data; > + dsz = msg_size(buf_msg(skb)); > + dmax = pktmax - INT_H_SIZE; > + > + if (dsz <= dmax || !dmax) > + return -EINVAL; > + > + nof_fragms = dsz / dmax + 1; > + > + for (pktno = 1; pktno <= nof_fragms; pktno++) { > + if (pktno < nof_fragms) > + eat = dmax; > + else > + eat = dsz % dmax; > + > + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); > + if (!_skb) > + goto error; > + > + skb_orphan(_skb); > + __skb_queue_tail(frags, _skb); > + > + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); > + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); > + data += eat; > + > + _hdr = buf_msg(_skb); > + msg_set_fragm_no(_hdr, pktno); > + msg_set_nof_fragms(_hdr, nof_fragms); > + msg_set_size(_hdr, INT_H_SIZE + eat); > + } > + return 0; > + In fact we have similar code in tipc_msg_build() where we also fragment packet if necessary. In order to eliminate redundant code, I suggest we should extract the common code into a separate function and then tipc_msg_build() and tipc_msg_fragment() call it. |
From: Xue, Y. <Yin...@wi...> - 2019-06-18 12:13:07
|
Hi Tuong. Thank you for your explanation . It makes sense to me. Please go ahead. Thanks, Ying -----Original Message----- From: Tuong Lien Tong [mailto:tuo...@de...] Sent: Monday, June 17, 2019 4:49 PM To: Xue, Ying; tip...@li...; jon...@er...; ma...@do... Subject: RE: [PATCH RFC 2/2] tipc: fix changeover issues due to large packet Hi Ying, Thanks for your comments! Regarding your last statement, yes when making the patch, I noticed that the "tipc_msg_build()" and "tipc_msg_fragment()" do a similar task, also I tried to think a way to combine them but didn't because of the reasons: 1- The "core" functions to copy the data are different since the "tipc_msg_build()" plays with user data in the iov buffers, whereas, for the other, it's skb data. Also, the outputs are different, the first function will set the messages' type in header such as "FIRST_FRAGMENT", "FRAGMENT" or "LAST_FRAGMENT", but not with the other because it will overwrite the tunnel messages' type... that I had to use the other field (fragm_no/nof_fragms) to determine this at the receiving side... 2- I don't want to touch the old code that can be risky :( BR/Tuong -----Original Message----- From: Ying Xue <yin...@wi...> Sent: Sunday, June 16, 2019 1:42 PM To: Tuong Lien <tuo...@de...>; tip...@li...; jon...@er...; ma...@do... Subject: Re: [PATCH RFC 2/2] tipc: fix changeover issues due to large packet > 2) The same scenario above can happen more easily in case the MTU of > the links is set differently or when changing. In that case, as long as > a large message in the failure link's transmq queue was built and > fragmented with its link's MTU > the other link's one, the issue will > happen (there is no need of a link synching in advance). > > 3) The link synching procedure also faces with the same issue but since > the link synching is only started upon receipt of a SYNCH_MSG, dropping > the message will not result in a state deadlock, but it is not expected > as design. > > The 1) & 3) issues are resolved by the previous commit 81e4dd94b214 This is the same as previous commit. The commit ID might be invalid after it's merged into upstream. > ("tipc: optimize link synching mechanism") by generating only a dummy > SYNCH_MSG (i.e. without data) at the link synching, so the size of a > FAILOVER_MSG if any then will never exceed the link's MTU. > /** > + * tipc_msg_fragment - build a fragment skb list for TIPC message > + * > + * @skb: TIPC message skb > + * @hdr: internal msg header to be put on the top of the fragments > + * @pktmax: max size of a fragment incl. the header > + * @frags: returned fragment skb list > + * > + * Returns 0 if the fragmentation is successful, otherwise: -EINVAL > + * or -ENOMEM > + */ > +int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, > + int pktmax, struct sk_buff_head *frags) > +{ > + int pktno, nof_fragms, dsz, dmax, eat; > + struct tipc_msg *_hdr; > + struct sk_buff *_skb; > + u8 *data; > + > + /* Non-linear buffer? */ > + if (skb_linearize(skb)) > + return -ENOMEM; > + > + data = (u8 *)skb->data; > + dsz = msg_size(buf_msg(skb)); > + dmax = pktmax - INT_H_SIZE; > + > + if (dsz <= dmax || !dmax) > + return -EINVAL; > + > + nof_fragms = dsz / dmax + 1; > + > + for (pktno = 1; pktno <= nof_fragms; pktno++) { > + if (pktno < nof_fragms) > + eat = dmax; > + else > + eat = dsz % dmax; > + > + _skb = tipc_buf_acquire(INT_H_SIZE + eat, GFP_ATOMIC); > + if (!_skb) > + goto error; > + > + skb_orphan(_skb); > + __skb_queue_tail(frags, _skb); > + > + skb_copy_to_linear_data(_skb, hdr, INT_H_SIZE); > + skb_copy_to_linear_data_offset(_skb, INT_H_SIZE, data, eat); > + data += eat; > + > + _hdr = buf_msg(_skb); > + msg_set_fragm_no(_hdr, pktno); > + msg_set_nof_fragms(_hdr, nof_fragms); > + msg_set_size(_hdr, INT_H_SIZE + eat); > + } > + return 0; > + In fact we have similar code in tipc_msg_build() where we also fragment packet if necessary. In order to eliminate redundant code, I suggest we should extract the common code into a separate function and then tipc_msg_build() and tipc_msg_fragment() call it. |