|
From: Jon M. <ma...@do...> - 2013-10-20 21:28:46
|
On 09/30/2013 10:38 AM, Ying Xue wrote:
> The 'blocked' flag in struct tipc_bearer is currently protected by
> a big spinlock aggregated into tipc_bearer. We want to reduce all
> dependencies to this lock, as part of our effort to make the locking
> policy simpler and more manageable.
>
> Furthermore, the functions tipc_disc_recv_msg() and disc_send_msg()
> access the flag without taking this lock at all, implying a risk that
> they may occasionally obtain a wrong value and drop sending/receiving
> of discovery messages. Since discovery messages are timer driven and
> best-effort, the latter problem is not fatal, but it is clearly avoidable.
>
> By converting the type of the flag from int to atomic_t we eliminate
> both the spinlock dependency and the current race condition.
>
> We also make two small code changes:
>
> 1) The functions tipc_continue() and bearer_blocked() now become
> one-liners doing a single access to the flag. So we just remove
> those redundant functions.
>
> 2) The initialization of the flag is moved from the enable_media()
> functions in ib_media.c and eth_media.c respecively, to the generic
> tipc_enable_bearer() function, so that it is done in one place only.
Maybe it is just me, but I still fail to see the benefit (or harm) of this change.
As I understand it, the time line for the current implementation, and the
one you are suggesting, look like this:
Without atomc_t:
------------------------
1: Device driver detects loss of carrier or similar.
2: It issues a call (work_queue?) to all registered users.
3: Until the call has reached the users, they (including TIPC) will continue
to send packets, although the driver can only drop them.
4: The field bearer->blocked is set as a one single write both on 32-bits and
64-bist cPUs, since it is one word, and a small constant value (1 or 0) that is written.
5: Other cores happening to work on the same cache line at that moment
will notice nothing; they will continue to send the packet they were working
on, seeing the previous value of the field.
6: The cache line is flushed and reloaded as soon as next packet is sent or received
on these cores. (I don't think a single 64-byte cache line will remain between
two transmissions. There is way to many data fields involved.)
7: The next time the cache line is reloaded it has the correct value of
'blocked', since the cache coherency HW support ensures that
the cache is updated. Next packet is not sent.
With atomic_t:
------------------
1: Device driver detects loss of carrier or similar.
2: It issues a call (work_queue?) to all registered users.
3: Until the call has reached the users, they (including TIPC) will continue
to send packets, although the driver can only drop them.
4: The field bearer->blocked is set as a single, atomic write, which it would be
anyway, just as in the previous case.
5: Other cores happening to read on the same cache line at that moment
will now see the cache line invalidated, so they will have to reread it with the
new, correct value. The 'volatile' keyword ensures that. As I understand
GCC even puts in memory barrier around volatile variables, but I fail to
to see the relevance here, we don make any other changes to the
bearer struct anyway, so ordereing is not an issue.
6: The cache line now has the correct read value. Packet is not sent.
What have we won? That TIPC may have time to send one more
packet per core than it would otherwise, but nothing can stop
TIPC and others from sending a small number of packets during the
interval between carrier loss detection and until the 'blocked' flag is
updated. The difference it makes if we cache line is invalidated immediately
or at next reload is microscopic in this context.
Admittedly, the harm done is not big either. The memory
barriers put in by GCC around the volatile read does not cost much.
But why make such a change if we cannot motivate it?
Maybe my view is too simplistic, but then somebody has to explain
what is wrong with this reasoning.
Basically, I don't think we need to protect this field with neither
spinlocks, memory barriers nor making it atomic.
Regards
///jon
>
> Signed-off-by: Ying Xue <yin...@wi...>
> ---
> net/tipc/bcast.c | 4 ++--
> net/tipc/bearer.c | 33 ++++-----------------------------
> net/tipc/bearer.h | 5 +----
> net/tipc/discover.c | 5 +++--
> net/tipc/eth_media.c | 9 ++++-----
> net/tipc/ib_media.c | 9 ++++-----
> net/tipc/link.c | 12 ++++++------
> 7 files changed, 24 insertions(+), 53 deletions(-)
>
> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
> index 716de1a..c78f30c 100644
> --- a/net/tipc/bcast.c
> +++ b/net/tipc/bcast.c
> @@ -615,8 +615,8 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
> if (!p)
> break; /* No more bearers to try */
>
> - if (tipc_bearer_blocked(p)) {
> - if (!s || tipc_bearer_blocked(s))
> + if (atomic_read(&p->blocked)) {
> + if (!s || atomic_read(&s->blocked))
> continue; /* Can't use either bearer */
> b = s;
> }
> diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
> index 3f9707a..17f9824 100644
> --- a/net/tipc/bearer.c
> +++ b/net/tipc/bearer.c
> @@ -275,31 +275,6 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
> tipc_disc_remove_dest(b_ptr->link_req);
> }
>
> -/*
> - * Interrupt enabling new requests after bearer blocking:
> - * See bearer_send().
> - */
> -void tipc_continue(struct tipc_bearer *b)
> -{
> - spin_lock_bh(&b->lock);
> - b->blocked = 0;
> - spin_unlock_bh(&b->lock);
> -}
> -
> -/*
> - * tipc_bearer_blocked - determines if bearer is currently blocked
> - */
> -int tipc_bearer_blocked(struct tipc_bearer *b)
> -{
> - int res;
> -
> - spin_lock_bh(&b->lock);
> - res = b->blocked;
> - spin_unlock_bh(&b->lock);
> -
> - return res;
> -}
> -
> /**
> * tipc_enable_bearer - enable bearer with the given name
> */
> @@ -387,6 +362,8 @@ restart:
>
> b_ptr = &tipc_bearers[bearer_id];
> strcpy(b_ptr->name, name);
> + atomic_set(&b_ptr->blocked, 0);
> + smp_mb();
> res = m_ptr->enable_media(b_ptr);
> if (res) {
> pr_warn("Bearer <%s> rejected, enable failure (%d)\n",
> @@ -429,8 +406,8 @@ int tipc_block_bearer(struct tipc_bearer *b_ptr)
>
> read_lock_bh(&tipc_net_lock);
> pr_info("Blocking bearer <%s>\n", b_ptr->name);
> + atomic_add_unless(&b_ptr->blocked, 1, 1);
> spin_lock_bh(&b_ptr->lock);
> - b_ptr->blocked = 1;
> list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
> struct tipc_node *n_ptr = l_ptr->owner;
>
> @@ -455,8 +432,8 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
> struct tipc_link_req *temp_req;
>
> pr_info("Disabling bearer <%s>\n", b_ptr->name);
> + atomic_add_unless(&b_ptr->blocked, 1, 1);
> spin_lock_bh(&b_ptr->lock);
> - b_ptr->blocked = 1;
> b_ptr->media->disable_media(b_ptr);
> list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
> tipc_link_delete(l_ptr);
> @@ -489,8 +466,6 @@ int tipc_disable_bearer(const char *name)
> return res;
> }
>
> -
> -
> void tipc_bearer_stop(void)
> {
> u32 i;
> diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
> index e5e04be..8695d4a 100644
> --- a/net/tipc/bearer.h
> +++ b/net/tipc/bearer.h
> @@ -130,7 +130,7 @@ struct tipc_media {
> struct tipc_bearer {
> void *usr_handle; /* initalized by media */
> u32 mtu; /* initalized by media */
> - int blocked; /* initalized by media */
> + atomic_t blocked;
> struct tipc_media_addr addr; /* initalized by media */
> char name[TIPC_MAX_BEARER_NAME];
> spinlock_t lock;
> @@ -164,8 +164,6 @@ int tipc_register_media(struct tipc_media *m_ptr);
> void tipc_recv_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr);
>
> int tipc_block_bearer(struct tipc_bearer *b_ptr);
> -void tipc_continue(struct tipc_bearer *tb_ptr);
> -
> int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
> int tipc_disable_bearer(const char *name);
>
> @@ -194,7 +192,6 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
> struct tipc_bearer *tipc_bearer_find(const char *name);
> struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
> struct tipc_media *tipc_media_find(const char *name);
> -int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
> void tipc_bearer_stop(void);
>
> /**
> diff --git a/net/tipc/discover.c b/net/tipc/discover.c
> index ecc758c..d8b49fb 100644
> --- a/net/tipc/discover.c
> +++ b/net/tipc/discover.c
> @@ -239,7 +239,8 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
> /* Accept discovery message & send response, if necessary */
> link_fully_up = link_working_working(link);
>
> - if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
> + if ((type == DSC_REQ_MSG) && !link_fully_up &&
> + !atomic_read(&b_ptr->blocked)) {
> rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
> if (rbuf) {
> tipc_bearer_send(b_ptr, rbuf, &media_addr);
> @@ -293,7 +294,7 @@ void tipc_disc_remove_dest(struct tipc_link_req *req)
> */
> static void disc_send_msg(struct tipc_link_req *req)
> {
> - if (!req->bearer->blocked)
> + if (!atomic_read(&req->bearer->blocked))
> tipc_bearer_send(req->bearer, req->buf, &req->dest);
> }
>
> diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c
> index f80d59f..32d01e8 100644
> --- a/net/tipc/eth_media.c
> +++ b/net/tipc/eth_media.c
> @@ -1,7 +1,7 @@
> /*
> * net/tipc/eth_media.c: Ethernet bearer support for TIPC
> *
> - * Copyright (c) 2001-2007, Ericsson AB
> + * Copyright (c) 2001-2007, 2013, Ericsson AB
> * Copyright (c) 2005-2008, 2011-2013, Wind River Systems
> * All rights reserved.
> *
> @@ -199,7 +199,6 @@ static int enable_media(struct tipc_bearer *tb_ptr)
> tb_ptr->bcast_addr.media_id = TIPC_MEDIA_TYPE_ETH;
> tb_ptr->bcast_addr.broadcast = 1;
> tb_ptr->mtu = dev->mtu;
> - tb_ptr->blocked = 0;
> eth_media_addr_set(tb_ptr, &tb_ptr->addr, (char *)dev->dev_addr);
> return 0;
> }
> @@ -263,12 +262,12 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
> switch (evt) {
> case NETDEV_CHANGE:
> if (netif_carrier_ok(dev))
> - tipc_continue(eb_ptr->bearer);
> + atomic_add_unless(&eb_ptr->bearer->blocked, -1, 0);
> else
> tipc_block_bearer(eb_ptr->bearer);
> break;
> case NETDEV_UP:
> - tipc_continue(eb_ptr->bearer);
> + atomic_add_unless(&eb_ptr->bearer->blocked, -1, 0);
> break;
> case NETDEV_DOWN:
> tipc_block_bearer(eb_ptr->bearer);
> @@ -276,7 +275,7 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
> case NETDEV_CHANGEMTU:
> case NETDEV_CHANGEADDR:
> tipc_block_bearer(eb_ptr->bearer);
> - tipc_continue(eb_ptr->bearer);
> + atomic_add_unless(&eb_ptr->bearer->blocked, -1, 0);
> break;
> case NETDEV_UNREGISTER:
> case NETDEV_CHANGENAME:
> diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c
> index c139892..5241a5b 100644
> --- a/net/tipc/ib_media.c
> +++ b/net/tipc/ib_media.c
> @@ -5,7 +5,7 @@
> *
> * Based on eth_media.c, which carries the following copyright notice:
> *
> - * Copyright (c) 2001-2007, Ericsson AB
> + * Copyright (c) 2001-2007, 2013, Ericsson AB
> * Copyright (c) 2005-2008, 2011, Wind River Systems
> * All rights reserved.
> *
> @@ -192,7 +192,6 @@ static int enable_media(struct tipc_bearer *tb_ptr)
> tb_ptr->bcast_addr.media_id = TIPC_MEDIA_TYPE_IB;
> tb_ptr->bcast_addr.broadcast = 1;
> tb_ptr->mtu = dev->mtu;
> - tb_ptr->blocked = 0;
> ib_media_addr_set(tb_ptr, &tb_ptr->addr, (char *)dev->dev_addr);
> return 0;
> }
> @@ -256,12 +255,12 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
> switch (evt) {
> case NETDEV_CHANGE:
> if (netif_carrier_ok(dev))
> - tipc_continue(ib_ptr->bearer);
> + atomic_add_unless(&ib_ptr->bearer->blocked, -1, 0);
> else
> tipc_block_bearer(ib_ptr->bearer);
> break;
> case NETDEV_UP:
> - tipc_continue(ib_ptr->bearer);
> + atomic_add_unless(&ib_ptr->bearer->blocked, -1, 0);
> break;
> case NETDEV_DOWN:
> tipc_block_bearer(ib_ptr->bearer);
> @@ -269,7 +268,7 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
> case NETDEV_CHANGEMTU:
> case NETDEV_CHANGEADDR:
> tipc_block_bearer(ib_ptr->bearer);
> - tipc_continue(ib_ptr->bearer);
> + atomic_add_unless(&ib_ptr->bearer->blocked, -1, 0);
> break;
> case NETDEV_UNREGISTER:
> case NETDEV_CHANGENAME:
> diff --git a/net/tipc/link.c b/net/tipc/link.c
> index 4d1297e..8c0e9a3 100644
> --- a/net/tipc/link.c
> +++ b/net/tipc/link.c
> @@ -796,7 +796,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
> return link_send_long_buf(l_ptr, buf);
>
> /* Packet can be queued or sent. */
> - if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
> + if (likely(!atomic_read(&l_ptr->b_ptr->blocked) &&
> !link_congested(l_ptr))) {
> link_add_to_outqueue(l_ptr, buf, msg);
>
> @@ -963,7 +963,7 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
>
> if (likely(!link_congested(l_ptr))) {
> if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
> - if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
> + if (likely(!atomic_read(&l_ptr->b_ptr->blocked))) {
> link_add_to_outqueue(l_ptr, buf, msg);
> tipc_bearer_send(l_ptr->b_ptr, buf,
> &l_ptr->media_addr);
> @@ -1020,7 +1020,7 @@ exit:
>
> /* Exit if link (or bearer) is congested */
> if (link_congested(l_ptr) ||
> - tipc_bearer_blocked(l_ptr->b_ptr)) {
> + atomic_read(&l_ptr->b_ptr->blocked)) {
> res = link_schedule_port(l_ptr,
> sender->ref, res);
> goto exit;
> @@ -1287,7 +1287,7 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
> {
> u32 res;
>
> - if (tipc_bearer_blocked(l_ptr->b_ptr))
> + if (atomic_read(&l_ptr->b_ptr->blocked))
> return;
>
> do {
> @@ -1376,7 +1376,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
>
> msg = buf_msg(buf);
>
> - if (tipc_bearer_blocked(l_ptr->b_ptr)) {
> + if (atomic_read(&l_ptr->b_ptr->blocked)) {
> if (l_ptr->retransm_queue_size == 0) {
> l_ptr->retransm_queue_head = msg_seqno(msg);
> l_ptr->retransm_queue_size = retransmits;
> @@ -1870,7 +1870,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
> buf->priority = TC_PRIO_CONTROL;
>
> /* Defer message if bearer is already blocked */
> - if (tipc_bearer_blocked(l_ptr->b_ptr)) {
> + if (atomic_read(&l_ptr->b_ptr->blocked)) {
> l_ptr->proto_msg_queue = buf;
> return;
> }
|