From: Jon M. <jon...@er...> - 2019-12-22 03:00:08
|
As I won't have sufficient time for completeing the work I have been doing with introducing GSO during the coming months, I send out the patches "as is", in case anybody feels compelled to continue the work. The code here is probably not very robust, especially regarding failover during full traffic, but gives a neat improvement when using UDP, so we can now enjoy good performance while avoiding the virtio ring buffer overflows and resets we have been observing earlier. Unfortunately it also brings significantly lower performance across the Ethernet bearer, something we have to resolve somehow. Finally, if we can make this work in a satisfactory way, it opens the path to introducing host supported GSO, so that full 16k or 64k buffers can be transported VM-to-VM. Jon Maloy (6): tipc: change size of stream message chunks Revert "tipc: set default MTU for UDP media" tipc: refactor function tipc_link_tnl_prepare() tipc: separate packet sequence numbering from skb numbering tipc: introduce GSO tipc: let stream chunks bypass gso over udp include/uapi/linux/tipc_config.h | 5 - net/tipc/bcast.c | 16 +- net/tipc/bearer.c | 29 +++ net/tipc/link.c | 273 ++++++++++++-------------- net/tipc/msg.c | 409 +++++++++++++++++++++++++-------------- net/tipc/msg.h | 58 +++++- net/tipc/node.c | 1 + net/tipc/node.h | 6 +- net/tipc/socket.c | 21 +- net/tipc/udp_media.c | 17 +- 10 files changed, 510 insertions(+), 325 deletions(-) -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 02:26:23
|
When a stream socket sends a message larger than TIPC_MAX_USER_MSG_SIZE (66k) it subdivides it into "chunks" of that very size. However, this is not the optimal size, as we have seen that throughput tends to decrease for very large messages. A chunk size of 16k gives a more stable, message size independent throughput. Hence we change this value now. Note however that this value can be changed only for stream sockets, since this the only socket type which can reasseble the chunks without considering message delimitation at the receiving side. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/msg.h | 1 + net/tipc/socket.c | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 6d466eb..e4c13f2 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -98,6 +98,7 @@ struct plist; #define MAX_H_SIZE 60 /* Largest possible TIPC header size */ #define MAX_MSG_SIZE (MAX_H_SIZE + TIPC_MAX_USER_MSG_SIZE) +#define TIPC_MSG_CHUNK_SIZE 16384 #define FB_MTU 3744 #define TIPC_MEDIA_INFO_OFFSET 5 diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 41688da..884dad5 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -104,6 +104,7 @@ struct tipc_sock { struct list_head cong_links; struct list_head publications; u32 pub_count; + u32 chunk_size; atomic_t dupl_rcvcnt; u16 conn_timeout; bool probe_unacked; @@ -502,6 +503,10 @@ static int tipc_sk_create(struct net *net, struct socket *sock, sk->sk_write_space = tipc_write_space; sk->sk_destruct = tipc_sock_destruct; tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; + if (sock->type == SOCK_STREAM) + tsk->chunk_size = TIPC_MSG_CHUNK_SIZE; + else + tsk->chunk_size = TIPC_MAX_USER_MSG_SIZE; tsk->group_is_open = true; atomic_set(&tsk->dupl_rcvcnt, 0); @@ -1527,7 +1532,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) tipc_sk_connected(sk))); if (unlikely(rc)) break; - send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); + send = min_t(size_t, dlen - sent, tsk->chunk_size); blocks = tsk->snd_backlog; if (tsk->oneway++ >= 4 && send <= maxnagle) { rc = tipc_msg_append(hdr, m, send, maxnagle, txq); -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 02:26:29
|
This reverts commit a4dfa72d0acd1c99a160e25c099849ae37ad13fd The above change turned out to be too aggressive, as it sometimes overwhelms the virtio ring buffer and leads it to block all transmsssion during up to 30 seconds. The basic problem is that the link congestion control algorithm is only considering the transmission queue length, disregarding the fact that each "packet" in the queue now may correspond to a dozen actual network packets. We will introduce a better solution to this in a later commit. Signed-off-by: Jon Maloy <jon...@er...> --- include/uapi/linux/tipc_config.h | 5 ----- net/tipc/link.c | 4 ++-- net/tipc/udp_media.c | 4 ++-- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/include/uapi/linux/tipc_config.h b/include/uapi/linux/tipc_config.h index 4dfc056..15a3219 100644 --- a/include/uapi/linux/tipc_config.h +++ b/include/uapi/linux/tipc_config.h @@ -185,11 +185,6 @@ #define TIPC_DEF_LINK_WIN 50 #define TIPC_MAX_LINK_WIN 8191 -/* - * Default MTU for UDP media - */ - -#define TIPC_DEF_LINK_UDP_MTU 14000 struct tipc_node_info { __be32 addr; /* network address of node */ diff --git a/net/tipc/link.c b/net/tipc/link.c index 467c53a..2531f94 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -527,7 +527,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id, * tipc_link_bc_create - create new link to be used for broadcast * @n: pointer to associated node * @mtu: mtu to be used initially if no peers - * @window: send window to be used + * @window: packet window to be used as base for send window * @inputq: queue to put messages ready for delivery * @namedq: queue to put binding table update messages ready for delivery * @link: return value, pointer to put the created link @@ -2267,7 +2267,7 @@ int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr, l->stats.recv_nacks++; - /* Ignore if peers_snd_nxt goes beyond receive window */ + /* Ignore if peers_snd_nxt goes beyond receive packet window */ if (more(peers_snd_nxt, l->rcv_nxt + l->window)) return rc; diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index d6620ad5..dcc4ba7 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -731,7 +731,8 @@ static int tipc_udp_enable(struct net *net, struct tipc_bearer *b, err = -EINVAL; goto err; } - b->mtu = b->media->mtu; + b->mtu = dev->mtu - sizeof(struct iphdr) + - sizeof(struct udphdr); #if IS_ENABLED(CONFIG_IPV6) } else if (local.proto == htons(ETH_P_IPV6)) { udp_conf.family = AF_INET6; @@ -830,7 +831,6 @@ struct tipc_media udp_media_info = { .tolerance = TIPC_DEF_LINK_TOL, .min_win = TIPC_DEF_LINK_WIN, .max_win = TIPC_DEF_LINK_WIN, - .mtu = TIPC_DEF_LINK_UDP_MTU, .type_id = TIPC_MEDIA_TYPE_UDP, .hwaddr_len = 0, .name = "udp" -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 02:26:29
|
In the coming commits we need a more generic tunneling prepare function. As a preparation for this, we refactor it in a separate commit. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/link.c | 148 +++++++++++++++++++------------------------------------- net/tipc/msg.c | 50 +++++++++++++++++++ net/tipc/msg.h | 4 +- 3 files changed, 104 insertions(+), 98 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 2531f94..f16219c 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1801,132 +1801,86 @@ void tipc_link_create_dummy_tnl_msg(struct tipc_link *l, void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, int mtyp, struct sk_buff_head *xmitq) { + bool enhanced = tnl->peer_caps & TIPC_TUNNEL_ENHANCED; struct sk_buff_head *fdefq = &tnl->failover_deferdq; - struct sk_buff *skb, *tnlskb; - struct tipc_msg *hdr, tnlhdr; - struct sk_buff_head *queue = &l->transmq; - struct sk_buff_head tmpxq, tnlq, frags; - u16 pktlen, pktcnt, seqno = l->snd_nxt; - bool pktcnt_need_update = false; - u16 syncpt; - int rc; + u32 self = tipc_own_addr(l->net); + struct sk_buff_head tnlq, tmpxq; + struct tipc_msg hdr, *_hdr; + u16 inner_seqno, syncpt; + struct sk_buff *skb; + int pktcnt, rc; + int inner_len; if (!tnl) return; + skb_queue_head_init(&tnlq); + skb_queue_head_init(&tmpxq); - __skb_queue_head_init(&tnlq); - /* Link Synching: - * From now on, send only one single ("dummy") SYNCH message - * to peer. The SYNCH message does not contain any data, just - * a header conveying the synch point to the peer. + /* When TUNNEL_ENHANCED is supported, it is sufficient to send a + * single empty SYNCH message to peer, conveying the synch point */ - if (mtyp == SYNCH_MSG && (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { - tnlskb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, - INT_H_SIZE, 0, l->addr, - tipc_own_addr(l->net), - 0, 0, 0); - if (!tnlskb) { - pr_warn("%sunable to create dummy SYNCH_MSG\n", - link_co_err); + if (mtyp == SYNCH_MSG && enhanced) { + + skb = tipc_msg_create(TUNNEL_PROTOCOL, SYNCH_MSG, INT_H_SIZE, 0, + l->addr, tipc_own_addr(l->net), 0, 0, 0); + if (!skb) { + pr_warn("%sfailed to create SYNCH_MSG\n", link_co_err); return; } - - hdr = buf_msg(tnlskb); + _hdr = buf_msg(skb); syncpt = l->snd_nxt + skb_queue_len(&l->backlogq) - 1; - msg_set_syncpt(hdr, syncpt); - msg_set_bearer_id(hdr, l->peer_bearer_id); - __skb_queue_tail(&tnlq, tnlskb); + msg_set_syncpt(_hdr, syncpt); + msg_set_bearer_id(_hdr, l->peer_bearer_id); + __skb_queue_tail(&tnlq, skb); tipc_link_xmit(tnl, &tnlq, xmitq); return; } - - __skb_queue_head_init(&tmpxq); - __skb_queue_head_init(&frags); - /* At least one packet required for safe algorithm => add dummy */ - skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_DIRECT_MSG, - BASIC_H_SIZE, 0, l->addr, tipc_own_addr(l->net), - 0, 0, TIPC_ERR_NO_PORT); + /* At least one packet is required for safe algorithm => add dummy */ + skb = tipc_msg_create(TIPC_LOW_IMPORTANCE, TIPC_CONN_MSG, BASIC_H_SIZE, + 0, l->addr, self, 0, 0, TIPC_ERR_NO_PORT); if (!skb) { pr_warn("%sunable to create tunnel packet\n", link_co_err); return; } - __skb_queue_tail(&tnlq, skb); + skb_queue_tail(&tnlq, skb); tipc_link_xmit(l, &tnlq, &tmpxq); __skb_queue_purge(&tmpxq); - /* Initialize reusable tunnel packet header */ - tipc_msg_init(tipc_own_addr(l->net), &tnlhdr, TUNNEL_PROTOCOL, - mtyp, INT_H_SIZE, l->addr); + /* Number of packets to report depends on if transmitq is linear */ if (mtyp == SYNCH_MSG) pktcnt = l->snd_nxt - buf_seqno(skb_peek(&l->transmq)); else pktcnt = skb_queue_len(&l->transmq); pktcnt += skb_queue_len(&l->backlogq); - msg_set_msgcnt(&tnlhdr, pktcnt); - msg_set_bearer_id(&tnlhdr, l->peer_bearer_id); -tnl: - /* Wrap each packet into a tunnel packet */ - skb_queue_walk(queue, skb) { - hdr = buf_msg(skb); - if (queue == &l->backlogq) - msg_set_seqno(hdr, seqno++); - pktlen = msg_size(hdr); - - /* Tunnel link MTU is not large enough? This could be - * due to: - * 1) Link MTU has just changed or set differently; - * 2) Or FAILOVER on the top of a SYNCH message - * - * The 2nd case should not happen if peer supports - * TIPC_TUNNEL_ENHANCED - */ - if (pktlen > tnl->mtu - INT_H_SIZE) { - if (mtyp == FAILOVER_MSG && - (tnl->peer_caps & TIPC_TUNNEL_ENHANCED)) { - rc = tipc_msg_fragment(skb, &tnlhdr, tnl->mtu, - &frags); - if (rc) { - pr_warn("%sunable to frag msg: rc %d\n", - link_co_err, rc); - return; - } - pktcnt += skb_queue_len(&frags) - 1; - pktcnt_need_update = true; - skb_queue_splice_tail_init(&frags, &tnlq); - continue; - } - /* Unluckily, peer doesn't have TIPC_TUNNEL_ENHANCED - * => Just warn it and return! - */ - pr_warn_ratelimited("%stoo large msg <%d, %d>: %d!\n", - link_co_err, msg_user(hdr), - msg_type(hdr), msg_size(hdr)); - return; - } - msg_set_size(&tnlhdr, pktlen + INT_H_SIZE); - tnlskb = tipc_buf_acquire(pktlen + INT_H_SIZE, GFP_ATOMIC); - if (!tnlskb) { - pr_warn("%sunable to send packet\n", link_co_err); - return; - } - skb_copy_to_linear_data(tnlskb, &tnlhdr, INT_H_SIZE); - skb_copy_to_linear_data_offset(tnlskb, INT_H_SIZE, hdr, pktlen); - __skb_queue_tail(&tnlq, tnlskb); + /* Copy all buffers into a temporary queue */ + inner_seqno = l->snd_nxt; + skb_queue_walk(&l->backlogq, skb) { + msg_set_seqno(buf_msg(skb), inner_seqno++); + } + tipc_skb_queue_copy(&l->transmq, &tnlq); + tipc_skb_queue_copy(&l->backlogq, &tnlq); + + /* Fragment the buffers if applicable */ + rc = tipc_skb_queue_fragment(&tnlq, tnl->mtu, &pktcnt, enhanced, mtyp); + if (rc) { + pr_warn("%sunable to frag msg: rc %d\n", link_co_err, rc); + __skb_queue_purge(&tnlq); + return; } - if (queue != &l->backlogq) { - queue = &l->backlogq; - goto tnl; + /* Create reusable tunnel header and prepend to packets */ + tipc_msg_init(self, &hdr, TUNNEL_PROTOCOL, mtyp, INT_H_SIZE, l->addr); + msg_set_msgcnt(&hdr, pktcnt); + msg_set_bearer_id(&hdr, l->peer_bearer_id); + skb_queue_walk(&tnlq, skb) { + inner_len = msg_size(buf_msg(skb)); + skb_push(skb, INT_H_SIZE); + msg_set_size(&hdr, inner_len + INT_H_SIZE); + skb_copy_to_linear_data(skb, &hdr, INT_H_SIZE); } - - if (pktcnt_need_update) - skb_queue_walk(&tnlq, skb) { - hdr = buf_msg(skb); - msg_set_msgcnt(hdr, pktcnt); - } - tipc_link_xmit(tnl, &tnlq, xmitq); + /* Prepare for receiving failover packets, if any */ if (mtyp == FAILOVER_MSG) { tnl->drop_point = l->rcv_nxt; tnl->failover_reasm_skb = l->reasm_buf; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 0d515d2..812334d 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -860,3 +860,53 @@ void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb, if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) __skb_queue_tail(xmitq, skb); } + +void tipc_skb_queue_copy(struct sk_buff_head *from, + struct sk_buff_head *to) +{ + struct sk_buff *skb, *__skb; + + skb_queue_walk(from, skb) { + __skb = pskb_copy(skb, GFP_ATOMIC); + if (!__skb) + break; + __skb_queue_tail(to, __skb); + } +} + +/* tipc_skb_queue_fragment(): Fragment tunnel packets if applicable + * Two cases: + * 1) Tunnel link MTU has just changed or is set differently + * 2) FAILOVER on top of a SYNCH message. + */ +int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax, + int *pktcnt, bool frag_supp, int mtyp) +{ + struct sk_buff_head frags, tmpq; + struct tipc_msg *hdr; + struct sk_buff *skb; + int rc = 0; + + __skb_queue_head_init(&frags); + __skb_queue_head_init(&tmpq); + skb_queue_splice_tail_init(skbq, &tmpq); + + skb_queue_walk(&tmpq, skb) { + hdr = buf_msg(skb); + if (msg_size(hdr) <= pktmax - INT_H_SIZE) + continue; + if (mtyp == FAILOVER_MSG && !frag_supp) { + rc = -1; + goto exit; + } + rc = tipc_msg_fragment(skb, hdr, pktmax, &frags); + if (rc) + goto exit; + *pktcnt += skb_queue_len(&frags) - 1; + skb_queue_splice_tail_init(&frags, &tmpq); + } +exit: + __skb_queue_purge(&frags); + skb_queue_splice_tail_init(&tmpq, skbq); + return rc; +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index e4c13f2..2197f64 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1125,7 +1125,9 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, struct sk_buff *skb); bool tipc_msg_skb_clone(struct sk_buff_head *msg, struct sk_buff_head *cpy); - +void tipc_skb_queue_copy(struct sk_buff_head *from, struct sk_buff_head *to); +int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax, + int *pktcnt, bool frag_supp, int mtyp); static inline u16 buf_seqno(struct sk_buff *skb) { return msg_seqno(buf_msg(skb)); -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 02:26:29
|
As I won't have sufficient time for completeing the work I have been doing with introducing GSO during the coming months, I send out the patches "as is", in case anybody feels compelled to continue the work. The code here is proably not very robust, especially regarding failover during full traffic, but gives a neat improvement when using UDP, so we can now enjoy good performance while avoiding the virtio ring buffer overflows and resets we have been observing earlier. Unfortunately it also brings significantly lower performance across the Ethernet bearer, something we have to resolve somehow. Finally, if we can make this work in a satisfactory way, it opens the path to introducing host supported GSO, so that full 16k or 64k buffers can be transported VM-to-VM. Jon Maloy (6): tipc: change size of stream message chunks Revert "tipc: set default MTU for UDP media" tipc: refactor function tipc_link_tnl_prepare() tipc: separate packet sequence numbering from skb numbering tipc: introduce GSO tipc: let stream chunks bypass gso over udp include/uapi/linux/tipc_config.h | 5 - net/tipc/bcast.c | 16 +- net/tipc/bearer.c | 29 +++ net/tipc/link.c | 273 ++++++++++++-------------- net/tipc/msg.c | 409 +++++++++++++++++++++++++-------------- net/tipc/msg.h | 58 +++++- net/tipc/node.c | 1 + net/tipc/node.h | 6 +- net/tipc/socket.c | 21 +- net/tipc/udp_media.c | 17 +- 10 files changed, 510 insertions(+), 325 deletions(-) -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 02:26:29
|
As a preparation for introduction of GSO we we need to eliminate the assumption that there is a one-to-one relation between queued/sent sk_buffs and ditto sequence numbered packets. A GSO prepared buffer may in the future represent many message fragments, each of which having their own packet sequence number. We therefore prepare message buffers of type FIRST_FRAGMENT so that they may contain a complete message, comprising all its potential fragments, -not only the first one. We do this by adding a "packet count" field and a corresponding "last_seqno()" function to the header of such messages, and adapt the link transmission and reception code to handle them correctly. Note that we don't actually intruduce multi-packet buffers in this commit, -only some of the necessary logics to handle such packets. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/link.c | 72 +++++++++++++++++++++++++++++++-------------------------- net/tipc/msg.c | 25 ++++++++++---------- net/tipc/msg.h | 18 +++++++++++++++ 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index f16219c..e205347 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -158,6 +158,7 @@ struct tipc_link { /* Sending */ struct sk_buff_head transmq; struct sk_buff_head backlogq; + u16 transmq_len; struct { u16 len; u16 limit; @@ -942,6 +943,7 @@ void tipc_link_reset(struct tipc_link *l) l->reasm_buf = NULL; l->reasm_tnlmsg = NULL; l->failover_reasm_skb = NULL; + l->transmq_len = 0; l->rcv_unacked = 0; l->snd_nxt = 1; l->rcv_nxt = 1; @@ -975,11 +977,11 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; - int pkt_cnt = skb_queue_len(list); int imp = msg_importance(hdr); unsigned int mss = tipc_link_mss(l); unsigned int cwin = l->window; unsigned int mtu = l->mtu; + unsigned int pktcnt; bool new_bundle; int rc = 0; @@ -990,7 +992,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, __skb_queue_purge(list); return -EMSGSIZE; } - /* Allow oversubscription of one data msg per source at congestion */ if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) { if (imp == TIPC_SYSTEM_IMPORTANCE) { @@ -1000,15 +1001,15 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, rc = link_schedule_user(l, hdr); } - if (pkt_cnt > 1) { - l->stats.sent_fragmented++; - l->stats.sent_fragments += pkt_cnt; - } - /* Prepare each packet for sending, and add to relevant queue: */ while ((skb = __skb_dequeue(list))) { - if (likely(skb_queue_len(transmq) < cwin)) { - hdr = buf_msg(skb); + hdr = buf_msg(skb); + pktcnt = msg_pktcnt(hdr); + if (msg_user(hdr) == MSG_FRAGMENTER) { + l->stats.sent_fragmented++; + l->stats.sent_fragments += skb_shinfo(skb)->gso_segs; + } + if (likely(l->transmq_len < cwin)) { msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); msg_set_bcast_ack(hdr, bc_ack); @@ -1019,14 +1020,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return -ENOBUFS; } __skb_queue_tail(transmq, skb); - /* next retransmit attempt */ - if (link_is_bc_sndlink(l)) - TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; + l->transmq_len += pktcnt; __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; + if (link_is_bc_sndlink(l)) + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; l->rcv_unacked = 0; - l->stats.sent_pkts++; - seqno++; + seqno += pktcnt; continue; } if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, &skb, @@ -1046,10 +1046,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, continue; } l->backlog[imp].target_bskb = NULL; - l->backlog[imp].len += (1 + skb_queue_len(list)); + l->backlog[imp].len += pktcnt; __skb_queue_tail(backlogq, skb); - skb_queue_splice_tail_init(list, backlogq); } + l->stats.sent_pkts += mod(seqno - l->snd_nxt); l->snd_nxt = seqno; return rc; } @@ -1118,23 +1118,22 @@ static void tipc_link_advance_backlog(struct tipc_link *l, __skb_dequeue(&l->backlogq); hdr = buf_msg(skb); imp = msg_importance(hdr); - l->backlog[imp].len--; + l->backlog[imp].len -= msg_pktcnt(hdr); if (unlikely(skb == l->backlog[imp].target_bskb)) l->backlog[imp].target_bskb = NULL; __skb_queue_tail(&l->transmq, skb); - /* next retransmit attempt */ + l->transmq_len += msg_pktcnt(hdr); if (link_is_bc_sndlink(l)) TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; - __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); msg_set_bcast_ack(hdr, bc_ack); l->rcv_unacked = 0; - l->stats.sent_pkts++; - seqno++; + seqno += msg_pktcnt(hdr); } + l->stats.sent_pkts += mod(seqno - l->snd_nxt); l->snd_nxt = seqno; } @@ -1202,8 +1201,6 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, u16 from, u16 to, struct sk_buff_head *xmitq) { struct sk_buff *_skb, *skb = skb_peek(&l->transmq); - u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; - u16 ack = l->rcv_nxt - 1; int retransmitted = 0; struct tipc_msg *hdr; int rc = 0; @@ -1230,9 +1227,6 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, _skb = pskb_copy(skb, GFP_ATOMIC); if (!_skb) return 0; - hdr = buf_msg(_skb); - msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_ack); _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; @@ -1406,13 +1400,18 @@ static int tipc_link_release_pkts(struct tipc_link *l, u16 acked) { int released = 0; struct sk_buff *skb, *tmp; + struct tipc_msg *hdr; + int pktcnt; skb_queue_walk_safe(&l->transmq, skb, tmp) { - if (more(buf_seqno(skb), acked)) + hdr = buf_msg(skb); + pktcnt = msg_pktcnt(hdr); + if (more(msg_last_seqno(hdr), acked)) break; __skb_unlink(skb, &l->transmq); kfree_skb(skb); - released++; + l->transmq_len -= pktcnt; + released += pktcnt; } return released; } @@ -1486,17 +1485,20 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, bool passed = false; u16 released = 0; u16 seqno, n = 0; + int pktcnt; int rc = 0; skb_queue_walk_safe(&l->transmq, skb, tmp) { - seqno = buf_seqno(skb); + seqno = msg_last_seqno(buf_msg(skb)); next_gap_ack: if (less_eq(seqno, acked)) { /* release skb */ + pktcnt = msg_pktcnt(buf_msg(skb)); __skb_unlink(skb, &l->transmq); kfree_skb(skb); - released++; + l->transmq_len -= pktcnt; + released += pktcnt; } else if (less_eq(seqno, acked + gap)) { /* First, check if repeated retrans failures occurs? */ if (!passed && link_retransmit_failure(l, l, &rc)) @@ -1622,7 +1624,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *defq = &l->deferdq; struct tipc_msg *hdr = buf_msg(skb); u16 seqno, rcv_nxt, win_lim; - int released = 0; + int pktcnt, released = 0; int rc = 0; /* Verify and update link state */ @@ -1635,6 +1637,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, do { hdr = buf_msg(skb); seqno = msg_seqno(hdr); + pktcnt = msg_pktcnt(hdr); rcv_nxt = l->rcv_nxt; win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; @@ -1661,14 +1664,15 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, } /* Deliver packet */ - l->rcv_nxt++; + l->rcv_nxt += pktcnt; l->stats.recv_pkts++; + l->rcv_unacked += pktcnt; if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL)) rc |= tipc_link_tnl_rcv(l, skb, l->inputq); else if (!tipc_data_input(l, skb, l->inputq)) rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf); - if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) + if (unlikely(l->rcv_unacked >= TIPC_MIN_LINK_WIN)) rc |= tipc_link_build_state_msg(l, xmitq); if (unlikely(rc & ~TIPC_LINK_SND_STATE)) break; @@ -1813,6 +1817,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, if (!tnl) return; + skb_queue_head_init(&tnlq); skb_queue_head_init(&tmpxq); @@ -2286,6 +2291,7 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, break; if (!--TIPC_SKB_CB(skb)->ackers) { __skb_unlink(skb, &snd_l->transmq); + snd_l->transmq_len -= msg_pktcnt(buf_msg(skb)); kfree_skb(skb); } } diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 812334d..a70d8a9 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -425,6 +425,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr)); msg_set_size(&pkthdr, pktmax); msg_set_fragm_no(&pkthdr, pktno); + msg_set_pktcnt(&pkthdr, 1); msg_set_importance(&pkthdr, msg_importance(mhdr)); /* Prepare first fragment */ @@ -828,27 +829,27 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, * @seqno: sequence number of buffer to add * @skb: buffer to add */ -void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, +void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 first, struct sk_buff *skb) { + u16 last = msg_last_seqno(buf_msg(skb)); struct sk_buff *_skb, *tmp; + struct tipc_msg *_hdr; + u16 _first, _last; - if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) { + if (skb_queue_empty(list) || less(last, buf_seqno(skb_peek(list)))) { __skb_queue_head(list, skb); return; } - - if (more(seqno, buf_seqno(skb_peek_tail(list)))) { - __skb_queue_tail(list, skb); - return; - } - - skb_queue_walk_safe(list, _skb, tmp) { - if (more(seqno, buf_seqno(_skb))) + skb_queue_reverse_walk_safe(list, _skb, tmp) { + _hdr = buf_msg(_skb); + _first = msg_seqno(_hdr); + _last = msg_last_seqno(_hdr); + if (less(last, _first)) continue; - if (seqno == buf_seqno(_skb)) + if (!less(first, _first) && !more(last, _last)) break; - __skb_queue_before(list, _skb, skb); + __skb_queue_after(list, _skb, skb); return; } kfree_skb(skb); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 2197f64..1b5c8c8 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -709,6 +709,24 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 15, 0x1fff, n); } +static inline u16 msg_pktcnt(struct tipc_msg *m) +{ + if (likely(msg_user(m) != MSG_FRAGMENTER || + msg_type(m) != FIRST_FRAGMENT)) + return 1; + return msg_bits(m, 1, 23, 0x3f); +} + +static inline void msg_set_pktcnt(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 1, 23, 0x3f, n); +} + +static inline u16 msg_last_seqno(struct tipc_msg *m) +{ + return msg_seqno(m) + msg_pktcnt(m) - 1; +} + /* * Word 2 */ -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 02:26:34
|
After experimenting with GSO and GRO we have found that they give very little performance improvements. However, the previously introduced GSO framework can be leveraged to significatly improve throughput across TIPC/UDP/IP anyway. We have found that we can disable the GSO callback for messages smaller than 16k, and instead let them be transported as is by the UDP/IP bearer. By doing this, we obtain a 45% increase of max throughput for large messages, getting close to the values we had with the earlier solution with an "emulated mtu". In contrast to that solution, we now use the real mtu and the real number of outstanding network packets as base for the link congestion control. Furthermore, if an initial 16k GSO chunk, corresponding to eleven 1500 byte network packets, is lost, the algorithm will retransmit the individual network packets, not the whole message or stream chunk. This solution seems to be sufficient robust to work well even in lossy networks, and does not overwhelm and reset the virtio ring buffer as we have seen in some cases before. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/link.c | 11 +++++++++-- net/tipc/msg.c | 9 +++++++-- net/tipc/msg.h | 23 +++++++++++++++++++---- net/tipc/node.h | 6 ++++-- net/tipc/udp_media.c | 13 +++++++++++-- 5 files changed, 50 insertions(+), 12 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index 8455fd1..7f85165 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1000,6 +1000,8 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, if (msg_user(hdr) == MSG_FRAGMENTER) { l->stats.sent_fragmented++; l->stats.sent_fragments += skb_shinfo(skb)->gso_segs; + TIPC_SKB_CB(skb)->peer_gso_support = + !!(l->peer_caps & TIPC_GSO_SUPPORT); } if (likely(l->transmq_len < cwin)) { msg_set_seqno(hdr, seqno); @@ -1104,6 +1106,8 @@ static void tipc_link_advance_backlog(struct tipc_link *l, skb = skb_peek(&l->backlogq); if (!skb) break; + TIPC_SKB_CB(skb)->peer_gso_support = + !!(l->peer_caps & TIPC_GSO_SUPPORT); _skb = skb_clone(skb, GFP_ATOMIC); if (!_skb) break; @@ -1193,6 +1197,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, u16 from, u16 to, struct sk_buff_head *xmitq) { struct sk_buff *_skb, *skb = skb_peek(&l->transmq); + int mss = tipc_link_mtu(l) - 2 * INT_H_SIZE; int retransmitted = 0; struct tipc_msg *hdr; int rc = 0; @@ -1218,7 +1223,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; if (msg_user(hdr) == MSG_FRAGMENTER) { skb->priority = TC_PRIO_CONTROL; - tipc_skb_segment(skb, 0, from, to, xmitq); + tipc_skb_segment(skb, 0, from, to, mss, xmitq); continue; } _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC); @@ -1474,6 +1479,7 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, struct tipc_gap_ack_blks *ga, struct sk_buff_head *xmitq) { + int mss = tipc_link_mtu(l) - 2 * INT_H_SIZE; struct sk_buff *skb, *_skb, *tmp; struct tipc_msg *hdr, *_hdr; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; @@ -1509,7 +1515,8 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME; if (msg_user(hdr) == MSG_FRAGMENTER) { skb->priority = TC_PRIO_CONTROL; - tipc_skb_segment(skb, 0, acked, acked + gap, xmitq); + tipc_skb_segment(skb, 0, acked, acked + gap, + mss, xmitq); l->stats.retransmitted++; retransmitted = true; continue; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 3c36ba2..83c2f17 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -152,6 +152,12 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) if (unlikely(skb_unclone(frag, GFP_ATOMIC))) goto err; head = *headbuf = frag; + if (msg_size(buf_msg(frag)) == msg_data_sz(msg)) { + *buf = head; + TIPC_SKB_CB(head)->tail = NULL; + *headbuf = NULL; + return 1; + } *buf = NULL; TIPC_SKB_CB(head)->tail = NULL; if (skb_is_nonlinear(head)) { @@ -868,7 +874,7 @@ int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax, } int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from, - u16 to, struct sk_buff_head *segs) + u16 to, int mss, struct sk_buff_head *segs) { struct skb_shared_info *shinfo = skb_shinfo(skb); unsigned char *tnl_hdr = skb->data - tnl_hlen; @@ -877,7 +883,6 @@ int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from, int frag_pos = frag->bv_offset; struct tipc_msg *seg_hdr = NULL; skb_frag_t *seg_frag = NULL; - int mss = shinfo->gso_size; struct sk_buff *seg = NULL; unsigned int hlen = msg_hdr_sz(hdr); unsigned int left = msg_data_sz(hdr); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 8391581..9fb02b5 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -103,6 +103,7 @@ struct plist; #define TIPC_MEDIA_INFO_OFFSET 5 struct tipc_skb_cb { +//<<<<<<< HEAD union { struct { struct sk_buff *tail; @@ -134,12 +135,25 @@ struct tipc_skb_cb { }; u8 flags; }; - u8 reserved; + u8 reserved:7; + bool peer_gso_support:1; #ifdef CONFIG_TIPC_CRYPTO void *crypto_ctx; #endif } __packed; - +#if 0 +======= + u32 bytes_read; + u32 orig_member; + struct sk_buff *tail; + unsigned long nxt_retr; + u16 chain_imp; + u16 ackers; + u16 peer_caps; + u8 validated; +}; +>>>>>>> tipc: let stream chunks bypass gso over udp +#endif #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) struct tipc_msg { @@ -1147,15 +1161,16 @@ void tipc_skb_queue_copy(struct sk_buff_head *from, struct sk_buff_head *to); int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax, int *pktcnt, bool frag_supp, int mtyp); int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from, u16 to, - struct sk_buff_head *segs); + int mss, struct sk_buff_head *segs); static inline int tipc_skb_segment_all(struct sk_buff *skb, int tnl_hlen, struct sk_buff_head *segs) { u16 from = msg_seqno(buf_msg(skb)); u16 to = from + skb_shinfo(skb)->gso_segs - 1; + int mss = skb_shinfo(skb)->gso_size; - return tipc_skb_segment(skb, tnl_hlen, from, to, segs); + return tipc_skb_segment(skb, tnl_hlen, from, to, mss, segs); } static inline u16 buf_seqno(struct sk_buff *skb) diff --git a/net/tipc/node.h b/net/tipc/node.h index a6803b4..591165c 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -55,7 +55,8 @@ enum { TIPC_MCAST_RBCTL = (1 << 7), TIPC_GAP_ACK_BLOCK = (1 << 8), TIPC_TUNNEL_ENHANCED = (1 << 9), - TIPC_NAGLE = (1 << 10) + TIPC_NAGLE = (1 << 10), + TIPC_GSO_SUPPORT = (1 << 11) }; #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ @@ -68,7 +69,8 @@ enum { TIPC_MCAST_RBCTL | \ TIPC_GAP_ACK_BLOCK | \ TIPC_TUNNEL_ENHANCED | \ - TIPC_NAGLE) + TIPC_NAGLE | \ + TIPC_GSO_SUPPORT) #define INVALID_BEARER_ID -1 diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index 974d260..210e754 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -52,6 +52,7 @@ #include "bearer.h" #include "netlink.h" #include "msg.h" +#include "node.h" /* IANA assigned UDP port */ #define UDP_PORT_DEFAULT 6118 @@ -181,8 +182,16 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff *skb, } dst_cache_set_ip4(cache, &rt->dst, fl.saddr); } - if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) - skb_shinfo(skb)->gso_type = SKB_GSO_UDP_TUNNEL; + /* IP layer does fragm/defrag better than GSO/GRO layer */ + if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) { + if (skb->len <= TIPC_MSG_CHUNK_SIZE + INT_H_SIZE * 2 && + TIPC_SKB_CB(skb)->peer_gso_support) { + skb_shinfo(skb)->gso_size = 0; + skb_shinfo(skb)->gso_segs = 0; + } else { + skb_shinfo(skb)->gso_type = SKB_GSO_UDP_TUNNEL; + } + } skb->dev = rt->dst.dev; ttl = ip4_dst_hoplimit(&rt->dst); udp_tunnel_xmit_skb(rt, ub->ubsock->sk, skb, src->ipv4.s_addr, -- 2.1.4 |
From: Jon M. <jon...@er...> - 2019-12-22 03:00:33
|
We introduce a generic GSO solution that works well both with TIPC/UDP and L2 bearers. his code does not by itself give any performance improvementsf, but it is a very useful framework for achieving such improvemnts in later commits. Signed-off-by: Jon Maloy <jon...@er...> --- net/tipc/bcast.c | 16 +-- net/tipc/bearer.c | 29 +++++ net/tipc/link.c | 44 ++++--- net/tipc/msg.c | 331 ++++++++++++++++++++++++++++++--------------------- net/tipc/msg.h | 18 ++- net/tipc/node.c | 1 + net/tipc/socket.c | 14 +-- net/tipc/udp_media.c | 4 +- 8 files changed, 286 insertions(+), 171 deletions(-) diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c index 42e01e9..c5654d6 100644 --- a/net/tipc/bcast.c +++ b/net/tipc/bcast.c @@ -360,14 +360,14 @@ static int tipc_mcast_send_sync(struct net *net, struct sk_buff *skb, /* tipc_mcast_xmit - deliver message to indicated destination nodes * and to identified node local sockets * @net: the applicable net namespace - * @pkts: chain of buffers containing message + * @xmitq: list containing message buffer * @method: send method to be used * @dests: destination nodes for message. * @cong_link_cnt: returns number of encountered congested destination links * Consumes buffer chain. * Returns 0 if success, otherwise errno */ -int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *xmitq, struct tipc_mc_method *method, struct tipc_nlist *dests, u16 *cong_link_cnt) { @@ -380,8 +380,8 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, skb_queue_head_init(&inputq); __skb_queue_head_init(&localq); - /* Clone packets before they are consumed by next call */ - if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { + /* Clone message before it is consumed by xmit call */ + if (dests->local && !tipc_msg_clone(xmitq, &localq)) { rc = -ENOMEM; goto exit; } @@ -389,7 +389,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, if (dests->remote) { tipc_bcast_select_xmit_method(net, dests->remote, method); - skb = skb_peek(pkts); + skb = skb_peek(xmitq); hdr = buf_msg(skb); if (msg_user(hdr) == MSG_FRAGMENTER) hdr = msg_inner_hdr(hdr); @@ -401,9 +401,9 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, dests, cong_link_cnt); if (method->rcast) - rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); + rc = tipc_rcast_xmit(net, xmitq, dests, cong_link_cnt); else - rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); + rc = tipc_bcast_xmit(net, xmitq, cong_link_cnt); } if (dests->local) { @@ -412,7 +412,7 @@ int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, } exit: /* This queue should normally be empty by now */ - __skb_queue_purge(pkts); + __skb_queue_purge(xmitq); return rc; } diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c index 34ca7b7..9477e31 100644 --- a/net/tipc/bearer.c +++ b/net/tipc/bearer.c @@ -229,6 +229,19 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest) rcu_read_unlock(); } +static struct sk_buff *tipc_gso_segment(struct sk_buff *skb, + netdev_features_t features) +{ + int tnl_hlen = skb->inner_mac_header - SKB_GSO_CB(skb)->mac_offset; + struct sk_buff_head segs; + + __skb_queue_head_init(&segs); + if (!tipc_skb_segment_all(skb, tnl_hlen, &segs)) + return ERR_PTR(-EINVAL); + skb_peek_tail(&segs)->next = NULL; + return __skb_peek(&segs); +} + /** * tipc_enable_bearer - enable bearer with the given name */ @@ -475,6 +488,9 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb, skb_reset_network_header(skb); skb->dev = dev; skb->protocol = htons(ETH_P_TIPC); + skb_set_inner_protocol(skb, htons(ETH_P_TIPC)); + if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) + skb_shinfo(skb)->gso_type = SKB_GSO_DODGY; dev_hard_header(skb, dev, ETH_P_TIPC, dest->value, dev->dev_addr, skb->len); dev_queue_xmit(skb); @@ -686,13 +702,26 @@ static struct notifier_block notifier = { .priority = 0, }; +static struct packet_offload tipc_offload = { + .type = htons(ETH_P_TIPC), + .priority = 0, + .callbacks = { + .gso_segment = tipc_gso_segment, + .gro_receive = NULL, + .gro_complete = NULL, + }, + .list = {0,}, +}; + int tipc_bearer_setup(void) { + dev_add_offload(&tipc_offload); return register_netdevice_notifier(¬ifier); } void tipc_bearer_cleanup(void) { + dev_remove_offload(&tipc_offload); unregister_netdevice_notifier(¬ifier); } diff --git a/net/tipc/link.c b/net/tipc/link.c index e205347..8455fd1 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -980,18 +980,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, int imp = msg_importance(hdr); unsigned int mss = tipc_link_mss(l); unsigned int cwin = l->window; - unsigned int mtu = l->mtu; unsigned int pktcnt; bool new_bundle; int rc = 0; - if (unlikely(msg_size(hdr) > mtu)) { - pr_warn("Too large msg, purging xmit list %d %d %d %d %d!\n", - skb_queue_len(list), msg_user(hdr), - msg_type(hdr), msg_size(hdr), mtu); - __skb_queue_purge(list); - return -EMSGSIZE; - } /* Allow oversubscription of one data msg per source at congestion */ if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) { if (imp == TIPC_SYSTEM_IMPORTANCE) { @@ -1224,7 +1216,12 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) continue; TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; - _skb = pskb_copy(skb, GFP_ATOMIC); + if (msg_user(hdr) == MSG_FRAGMENTER) { + skb->priority = TC_PRIO_CONTROL; + tipc_skb_segment(skb, 0, from, to, xmitq); + continue; + } + _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC); if (!_skb) return 0; _skb->priority = TC_PRIO_CONTROL; @@ -1478,28 +1475,29 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, struct sk_buff_head *xmitq) { struct sk_buff *skb, *_skb, *tmp; - struct tipc_msg *hdr; + struct tipc_msg *hdr, *_hdr; u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; bool retransmitted = false; u16 ack = l->rcv_nxt - 1; bool passed = false; + u16 first, last, n = 0; u16 released = 0; - u16 seqno, n = 0; int pktcnt; int rc = 0; skb_queue_walk_safe(&l->transmq, skb, tmp) { - seqno = msg_last_seqno(buf_msg(skb)); - + hdr = buf_msg(skb); + first = msg_seqno(hdr); + last = msg_last_seqno(hdr); next_gap_ack: - if (less_eq(seqno, acked)) { + if (!more(last, acked)) { /* release skb */ pktcnt = msg_pktcnt(buf_msg(skb)); __skb_unlink(skb, &l->transmq); kfree_skb(skb); l->transmq_len -= pktcnt; released += pktcnt; - } else if (less_eq(seqno, acked + gap)) { + } else if (!more(first, acked) && !more(acked + gap, last)) { /* First, check if repeated retrans failures occurs? */ if (!passed && link_retransmit_failure(l, l, &rc)) return rc; @@ -1509,12 +1507,20 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr)) continue; TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME; - _skb = pskb_copy(skb, GFP_ATOMIC); + if (msg_user(hdr) == MSG_FRAGMENTER) { + skb->priority = TC_PRIO_CONTROL; + tipc_skb_segment(skb, 0, acked, acked + gap, xmitq); + l->stats.retransmitted++; + retransmitted = true; + continue; + } + _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, + GFP_ATOMIC); if (!_skb) continue; - hdr = buf_msg(_skb); - msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_ack); + _hdr = buf_msg(_skb); + msg_set_ack(_hdr, ack); + msg_set_bcast_ack(_hdr, bc_ack); _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; diff --git a/net/tipc/msg.c b/net/tipc/msg.c index a70d8a9..3c36ba2 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -141,7 +141,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) if (!frag) goto err; - msg = buf_msg(frag); fragid = msg_type(msg); frag->next = NULL; @@ -167,7 +166,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf) if (!head) goto err; - if (skb_try_coalesce(head, frag, &headstolen, &delta)) { kfree_skb_partial(frag, headstolen); } else { @@ -368,115 +366,90 @@ int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, /** * tipc_msg_build - create buffer chain containing specified header and data - * @mhdr: Message header, to be prepended to data + * @hdr: TIPC message header, to be prepended to data * @m: User message - * @dsz: Total length of user data - * @pktmax: Max packet size that can be used - * @list: Buffer or chain of buffers to be returned to caller + * @dlen: Total length of user data + * @mtu: Max packet size that can be used + * @xmitq: Buffer or chain of buffers to be returned to caller * * Note that the recursive call we are making here is safe, since it can * logically go only one further level down. * * Returns message data size or errno: -ENOMEM, -EFAULT */ -int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, - int dsz, int pktmax, struct sk_buff_head *list) +int tipc_msg_build(struct tipc_msg *hdr, struct msghdr *m, int dlen, + int mtu, struct sk_buff_head *list) { - int mhsz = msg_hdr_sz(mhdr); - struct tipc_msg pkthdr; - int msz = mhsz + dsz; - int pktrem = pktmax; - struct sk_buff *skb; - int drem = dsz; - int pktno = 1; - char *pktpos; - int pktsz; - int rc; - - msg_set_size(mhdr, msz); - - /* No fragmentation needed? */ - if (likely(msz <= pktmax)) { - skb = tipc_buf_acquire(msz, GFP_KERNEL); - - /* Fall back to smaller MTU if node local message */ - if (unlikely(!skb)) { - if (pktmax != MAX_MSG_SIZE) - return -ENOMEM; - rc = tipc_msg_build(mhdr, m, offset, dsz, FB_MTU, list); - if (rc != dsz) - return rc; - if (tipc_msg_assemble(list)) - return dsz; - return -ENOMEM; - } - skb_orphan(skb); - __skb_queue_tail(list, skb); - skb_copy_to_linear_data(skb, mhdr, mhsz); - pktpos = skb->data + mhsz; - if (copy_from_iter_full(pktpos, dsz, &m->msg_iter)) - return dsz; - rc = -EFAULT; - goto error; + struct sk_buff *skb = NULL; + int hlen = msg_hdr_sz(hdr); + struct tipc_msg *seghdr; + int mlen = hlen + dlen; + unsigned char *pos; + skb_frag_t *frag; + int left = dlen; + int copy, rc, i; + int mss; + + msg_set_size(hdr, mlen); + + /* This also covers node internal messages */ + if (mlen <= mtu) + skb = alloc_skb_fclone(BUF_HEADROOM + mlen, GFP_KERNEL); + + if (!skb) { + skb = alloc_skb_with_frags(BUF_HEADROOM + hlen, dlen, + PAGE_ALLOC_COSTLY_ORDER, + &rc, GFP_KERNEL); + if (!skb) + goto error; } - /* Prepare reusable fragment header */ - tipc_msg_init(msg_prevnode(mhdr), &pkthdr, MSG_FRAGMENTER, - FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr)); - msg_set_size(&pkthdr, pktmax); - msg_set_fragm_no(&pkthdr, pktno); - msg_set_pktcnt(&pkthdr, 1); - msg_set_importance(&pkthdr, msg_importance(mhdr)); - - /* Prepare first fragment */ - skb = tipc_buf_acquire(pktmax, GFP_KERNEL); - if (!skb) - return -ENOMEM; - skb_orphan(skb); __skb_queue_tail(list, skb); - pktpos = skb->data; - skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE); - pktpos += INT_H_SIZE; - pktrem -= INT_H_SIZE; - skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz); - pktpos += mhsz; - pktrem -= mhsz; - - do { - if (drem < pktrem) - pktrem = drem; - - if (!copy_from_iter_full(pktpos, pktrem, &m->msg_iter)) { - rc = -EFAULT; + skb_reserve(skb, BUF_HEADROOM); + skb_copy_to_linear_data(skb, hdr, hlen); + skb->len = hlen; + skb->tail += hlen; + + if (skb_tailroom(skb) >= dlen) { + pos = skb->data + hlen; + if (!copy_from_iter_full(pos, dlen, &m->msg_iter)) goto error; - } - drem -= pktrem; - - if (!drem) - break; + skb->len += dlen; + skb->tail += dlen; + return dlen; + } - /* Prepare new fragment: */ - if (drem < (pktmax - INT_H_SIZE)) - pktsz = drem + INT_H_SIZE; - else - pktsz = pktmax; - skb = tipc_buf_acquire(pktsz, GFP_KERNEL); - if (!skb) { - rc = -ENOMEM; + for (copy = 0, i = 0; left; left -= copy, i++) { + frag = &skb_shinfo(skb)->frags[i]; + pos = page_address(frag->bv_page) + frag->bv_offset; + copy = skb_frag_size(frag) < left ? skb_frag_size(frag) : left; + if (!copy_from_iter_full(pos, copy, &m->msg_iter)) goto error; - } - skb_orphan(skb); - __skb_queue_tail(list, skb); - msg_set_type(&pkthdr, FRAGMENT); - msg_set_size(&pkthdr, pktsz); - msg_set_fragm_no(&pkthdr, ++pktno); - skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE); - pktpos = skb->data + INT_H_SIZE; - pktrem = pktsz - INT_H_SIZE; - - } while (1); - msg_set_type(buf_msg(skb), LAST_FRAGMENT); - return dsz; + skb->len += copy; + skb->data_len += copy; + } + + if (mlen <= mtu) + return dlen; + + /* Add outer header and prepare buffer for GSO */ + skb_push(skb, INT_H_SIZE); + seghdr = buf_msg(skb); + tipc_msg_init(msg_prevnode(hdr), seghdr, MSG_FRAGMENTER, + FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(hdr)); + msg_set_size(seghdr, INT_H_SIZE + mlen); + msg_set_fragm_no(seghdr, 1); + msg_set_importance(seghdr, msg_importance(hdr)); + skb_reset_inner_mac_header(skb); + skb_reset_inner_network_header(skb); + skb_set_inner_transport_header(skb, INT_H_SIZE); + mss = mtu - INT_H_SIZE; + skb_shinfo(skb)->gso_size = mss; + skb_shinfo(skb)->gso_segs = mlen / mss + !!(mlen % mss); + msg_set_pktcnt(seghdr, skb_shinfo(skb)->gso_segs); + skb->ip_summed = CHECKSUM_UNNECESSARY; + skb->encapsulation = 1; + return dlen; error: __skb_queue_purge(list); __skb_queue_head_init(list); @@ -632,14 +605,13 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos) * Replaces consumed buffer with new one when successful * Returns true if success, otherwise false */ + bool tipc_msg_reverse(u32 own_node, struct sk_buff **skb, int err) { struct sk_buff *_skb = *skb; struct tipc_msg *_hdr, *hdr; int hlen, dlen; - if (skb_linearize(_skb)) - goto exit; _hdr = buf_msg(_skb); dlen = min_t(uint, msg_data_sz(_hdr), MAX_FORWARD_SIZE); hlen = msg_hdr_sz(_hdr); @@ -661,8 +633,11 @@ bool tipc_msg_reverse(u32 own_node, struct sk_buff **skb, int err) *skb = tipc_buf_acquire(hlen + dlen, GFP_ATOMIC); if (!*skb) goto exit; + memcpy((*skb)->data, _skb->data, msg_hdr_sz(_hdr)); memcpy((*skb)->data + hlen, msg_data(_hdr), dlen); + hdr = buf_msg(*skb); + msg_set_hdr_sz(hdr, hlen); /* Build reverse header in new buffer */ hdr = buf_msg(*skb); @@ -767,44 +742,20 @@ bool tipc_msg_assemble(struct sk_buff_head *list) return false; } -/* tipc_msg_reassemble() - clone a buffer chain of fragments and - * reassemble the clones into one message +/* tipc_msg_clone() - clone a multicast message in 'list' for local destination + * sockets, strip off fragment header and add clone to rcvq */ -bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq) +bool tipc_msg_clone(struct sk_buff_head *list, struct sk_buff_head *rcvq) { - struct sk_buff *skb, *_skb; - struct sk_buff *frag = NULL; - struct sk_buff *head = NULL; - int hdr_len; - - /* Copy header if single buffer */ - if (skb_queue_len(list) == 1) { - skb = skb_peek(list); - hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); - _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC); - if (!_skb) - return false; - __skb_queue_tail(rcvq, _skb); - return true; - } + struct sk_buff *skb; - /* Clone all fragments and reassemble */ - skb_queue_walk(list, skb) { - frag = skb_clone(skb, GFP_ATOMIC); - if (!frag) - goto error; - frag->next = NULL; - if (tipc_buf_append(&head, &frag)) - break; - if (!head) - goto error; - } - __skb_queue_tail(rcvq, frag); + skb = skb_clone(skb_peek(list), GFP_KERNEL); + if (!skb) + return false; + __skb_queue_tail(rcvq, skb); + if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) + skb_pull(skb, INT_H_SIZE); return true; -error: - pr_warn("Failed do clone local mcast rcv buffer\n"); - kfree_skb(head); - return false; } bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, @@ -868,6 +819,10 @@ void tipc_skb_queue_copy(struct sk_buff_head *from, struct sk_buff *skb, *__skb; skb_queue_walk(from, skb) { + if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) { + tipc_skb_segment_all(skb, 0, to); + continue; + } __skb = pskb_copy(skb, GFP_ATOMIC); if (!__skb) break; @@ -911,3 +866,113 @@ int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax, skb_queue_splice_tail_init(&tmpq, skbq); return rc; } + +int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from, + u16 to, struct sk_buff_head *segs) +{ + struct skb_shared_info *shinfo = skb_shinfo(skb); + unsigned char *tnl_hdr = skb->data - tnl_hlen; + struct tipc_msg *hdr = buf_msg(skb); + skb_frag_t *frag = &shinfo->frags[0]; + int frag_pos = frag->bv_offset; + struct tipc_msg *seg_hdr = NULL; + skb_frag_t *seg_frag = NULL; + int mss = shinfo->gso_size; + struct sk_buff *seg = NULL; + unsigned int hlen = msg_hdr_sz(hdr); + unsigned int left = msg_data_sz(hdr); + unsigned int mtyp, ihlen, dlen; + unsigned int seg_left = mss; + u16 seqno = msg_seqno(hdr); + int fragno = 0; + int segno = 1; + + if (more(from, msg_last_seqno(hdr))) + return 0; + + if (less(from, seqno)) + from = seqno; + + if (more(to, msg_last_seqno(hdr))) + to = msg_last_seqno(hdr); + + while (!more(seqno, to)) { + /* Calculate but don't add segments until seqno == from */ + if (!less(seqno, from)) { + if (seqno == from) { + mtyp = FIRST_FRAGMENT; + ihlen = msg_hdr_sz(msg_inner_hdr(hdr)); + } else { + mtyp = FRAGMENT; + ihlen = 0; + } + seg = alloc_skb(BUF_HEADROOM + ihlen, GFP_ATOMIC); + if (!seg) + goto exit; + __skb_queue_tail(segs, seg); + seg->priority = skb->priority; + skb_reserve(seg, BUF_HEADROOM - (tnl_hlen + hlen)); + skb_reset_mac_header(seg); + skb_copy_to_linear_data(seg, tnl_hdr, + tnl_hlen + hlen + ihlen); + skb_put(seg, tnl_hlen + hlen + ihlen); + skb_pull(seg, tnl_hlen); + skb_reset_inner_mac_header(seg); + skb_reset_inner_network_header(seg); + skb_reset_inner_transport_header(seg); + seg->ip_summed = CHECKSUM_UNNECESSARY; + skb_reset_network_header(seg); + skb_reset_transport_header(seg); + seg->dev = skb->dev; + seg->protocol = skb->protocol; + skb_set_inner_protocol(seg, htons(ETH_P_TIPC)); + seg_hdr = buf_msg(seg); + seg_left = left < mss ? left : mss; + seg_left -= ihlen; + left -= ihlen; + msg_set_size(seg_hdr, hlen + ihlen + seg_left); + msg_set_fragm_no(seg_hdr, segno); + msg_set_seqno(seg_hdr, seqno); + msg_set_type(seg_hdr, mtyp); + msg_set_pktcnt(seg_hdr, 1); + seg_frag = &skb_shinfo(seg)->frags[0]; + + /* MAC and INET GSO behave differently regarding this */ + if (tnl_hlen == sizeof(struct ethhdr)) + skb_push(seg, tnl_hlen); + } + while (seg_left) { + dlen = min(seg_left, skb_frag_size(frag) - frag_pos); + if (seg) { + page_ref_inc(frag->bv_page); + seg_frag->bv_page = frag->bv_page; + skb_shinfo(seg)->nr_frags++; + seg_frag->bv_len = dlen; + seg_frag->bv_offset = frag_pos; + seg->len += dlen; + seg->data_len += dlen; + seg->truesize += dlen; + } + frag_pos += dlen; + seg_left -= dlen; + left -= dlen; + if (frag_pos < skb_frag_size(frag)) + continue; + if (++fragno > shinfo->nr_frags) { + pr_warn("msg_segment leaving prematurely\n"); + goto exit; + } + frag++; + frag_pos = frag->bv_offset; + seg_frag++; + }; + segno++; + seqno++; + seg = NULL; + } + + if (!left && seg_hdr) + msg_set_type(seg_hdr, LAST_FRAGMENT); +exit: + return skb_queue_len(segs); +} diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 1b5c8c8..8391581 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -1131,13 +1131,13 @@ bool tipc_msg_try_bundle(struct sk_buff *tskb, struct sk_buff **skb, u32 mss, bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); int tipc_msg_fragment(struct sk_buff *skb, const struct tipc_msg *hdr, int pktmax, struct sk_buff_head *frags); -int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, - int offset, int dsz, int mtu, struct sk_buff_head *list); +int tipc_msg_build(struct tipc_msg *hdr, struct msghdr *m, int dlen, + int mtu, struct sk_buff_head *list); int tipc_msg_append(struct tipc_msg *hdr, struct msghdr *m, int dlen, int mss, struct sk_buff_head *txq); bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); bool tipc_msg_assemble(struct sk_buff_head *list); -bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); +bool tipc_msg_clone(struct sk_buff_head *list, struct sk_buff_head *rcvq); bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, struct sk_buff_head *cpy); void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, @@ -1146,6 +1146,18 @@ bool tipc_msg_skb_clone(struct sk_buff_head *msg, struct sk_buff_head *cpy); void tipc_skb_queue_copy(struct sk_buff_head *from, struct sk_buff_head *to); int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax, int *pktcnt, bool frag_supp, int mtyp); +int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from, u16 to, + struct sk_buff_head *segs); + +static inline int tipc_skb_segment_all(struct sk_buff *skb, int tnl_hlen, + struct sk_buff_head *segs) +{ + u16 from = msg_seqno(buf_msg(skb)); + u16 to = from + skb_shinfo(skb)->gso_segs - 1; + + return tipc_skb_segment(skb, tnl_hlen, from, to, segs); +} + static inline u16 buf_seqno(struct sk_buff *skb) { return msg_seqno(buf_msg(skb)); diff --git a/net/tipc/node.c b/net/tipc/node.c index 99b28b6..4d23942 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -1865,6 +1865,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb, int usr = msg_user(hdr); int mtyp = msg_type(hdr); u16 oseqno = msg_seqno(hdr); + u16 iseqno = msg_seqno(msg_inner_hdr(hdr)); u16 exp_pkts = msg_msgcnt(hdr); u16 rcv_nxt, syncpt, dlv_nxt, inputq_len; int state = n->state; diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 884dad5..5dc26f5 100644 --- a/net/tipc/socket.c +++ b/net/tipc/socket.c @@ -844,8 +844,8 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, msg_set_nameupper(hdr, seq->upper); /* Build message as chain of buffers */ - __skb_queue_head_init(&pkts); - rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); + skb_queue_head_init(&pkts); + rc = tipc_msg_build(hdr, msg, dlen, mtu, &pkts); /* Send message if build was successful */ if (unlikely(rc == dlen)) { @@ -888,9 +888,9 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk, msg_set_grp_bc_seqno(hdr, bc_snd_nxt); /* Build message as chain of buffers */ - __skb_queue_head_init(&pkts); + skb_queue_head_init(&pkts); mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false); - rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + rc = tipc_msg_build(hdr, m, dlen, mtu, &pkts); if (unlikely(rc != dlen)) return rc; @@ -1094,7 +1094,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m, /* Build message as chain of buffers */ __skb_queue_head_init(&pkts); - rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + rc = tipc_msg_build(hdr, m, dlen, mtu, &pkts); if (unlikely(rc != dlen)) return rc; @@ -1452,7 +1452,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen) __skb_queue_head_init(&pkts); mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false); - rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); + rc = tipc_msg_build(hdr, m, dlen, mtu, &pkts); if (unlikely(rc != dlen)) return rc; if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) { @@ -1546,7 +1546,7 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen) } tsk->expect_ack = true; } else { - rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq); + rc = tipc_msg_build(hdr, m, send, maxpkt, txq); if (unlikely(rc != send)) break; blocks += tsk_inc(tsk, send + MIN_H_SIZE); diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c index dcc4ba7..974d260 100644 --- a/net/tipc/udp_media.c +++ b/net/tipc/udp_media.c @@ -181,7 +181,9 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff *skb, } dst_cache_set_ip4(cache, &rt->dst, fl.saddr); } - + if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) + skb_shinfo(skb)->gso_type = SKB_GSO_UDP_TUNNEL; + skb->dev = rt->dst.dev; ttl = ip4_dst_hoplimit(&rt->dst); udp_tunnel_xmit_skb(rt, ub->ubsock->sk, skb, src->ipv4.s_addr, dst->ipv4.s_addr, 0, ttl, 0, src->port, -- 2.1.4 |