|
From: Ying X. <yin...@wi...> - 2013-10-25 02:14:16
|
On 10/21/2013 05:14 AM, Jon Maloy wrote:
> On 09/30/2013 10:38 AM, Ying Xue wrote:
>> The 'blocked' flag in struct tipc_bearer is currently protected by
>> a big spinlock aggregated into tipc_bearer. We want to reduce all
>> dependencies to this lock, as part of our effort to make the locking
>> policy simpler and more manageable.
>>
>> Furthermore, the functions tipc_disc_recv_msg() and disc_send_msg()
>> access the flag without taking this lock at all, implying a risk that
>> they may occasionally obtain a wrong value and drop sending/receiving
>> of discovery messages. Since discovery messages are timer driven and
>> best-effort, the latter problem is not fatal, but it is clearly avoidable.
>>
>> By converting the type of the flag from int to atomic_t we eliminate
>> both the spinlock dependency and the current race condition.
>>
>> We also make two small code changes:
>>
>> 1) The functions tipc_continue() and bearer_blocked() now become
>> one-liners doing a single access to the flag. So we just remove
>> those redundant functions.
>>
>> 2) The initialization of the flag is moved from the enable_media()
>> functions in ib_media.c and eth_media.c respecively, to the generic
>> tipc_enable_bearer() function, so that it is done in one place only.
>
> Maybe it is just me, but I still fail to see the benefit (or harm) of this change.
>
> As I understand it, the time line for the current implementation, and the
> one you are suggesting, look like this:
>
> Without atomc_t:
> ------------------------
> 1: Device driver detects loss of carrier or similar.
> 2: It issues a call (work_queue?) to all registered users.
> 3: Until the call has reached the users, they (including TIPC) will continue
> to send packets, although the driver can only drop them.
> 4: The field bearer->blocked is set as a one single write both on 32-bits and
> 64-bist cPUs, since it is one word, and a small constant value (1 or 0) that is written.
> 5: Other cores happening to work on the same cache line at that moment
> will notice nothing; they will continue to send the packet they were working
> on, seeing the previous value of the field.
> 6: The cache line is flushed and reloaded as soon as next packet is sent or received
> on these cores. (I don't think a single 64-byte cache line will remain between
> two transmissions. There is way to many data fields involved.)
> 7: The next time the cache line is reloaded it has the correct value of
> 'blocked', since the cache coherency HW support ensures that
> the cache is updated. Next packet is not sent.
>
>
> With atomic_t:
> ------------------
> 1: Device driver detects loss of carrier or similar.
> 2: It issues a call (work_queue?) to all registered users.
> 3: Until the call has reached the users, they (including TIPC) will continue
> to send packets, although the driver can only drop them.
> 4: The field bearer->blocked is set as a single, atomic write, which it would be
> anyway, just as in the previous case.
> 5: Other cores happening to read on the same cache line at that moment
> will now see the cache line invalidated, so they will have to reread it with the
> new, correct value. The 'volatile' keyword ensures that. As I understand
> GCC even puts in memory barrier around volatile variables, but I fail to
> to see the relevance here, we don make any other changes to the
> bearer struct anyway, so ordereing is not an issue.
> 6: The cache line now has the correct read value. Packet is not sent.
>
> What have we won? That TIPC may have time to send one more
> packet per core than it would otherwise, but nothing can stop
> TIPC and others from sending a small number of packets during the
> interval between carrier loss detection and until the 'blocked' flag is
> updated. The difference it makes if we cache line is invalidated immediately
> or at next reload is microscopic in this context.
>
> Admittedly, the harm done is not big either. The memory
> barriers put in by GCC around the volatile read does not cost much.
> But why make such a change if we cannot motivate it?
>
> Maybe my view is too simplistic, but then somebody has to explain
> what is wrong with this reasoning.
> Basically, I don't think we need to protect this field with neither
> spinlocks, memory barriers nor making it atomic.
>
First of all, thank you for so detailed comments. In fact the things you
concerned below often confuse me. So I spend some times trying to
understand memory cache system of modern processor as well as the
effects of kinds of Linux memory barrier primitives on memory cache
coherence. That's why I am late to respond you. But it's really so
complex that I am also afraid that I don't entirely understand them
although I consumed several days to research. Anyway, I am still going
to answer your questions :)
Nowadays most of modern processors have cache coherence protocols
ensuring that all CPUs maintain a consistent view of data. The most
famous coherence protocol is called MESI. For example, the blocked
variable is manipulated by core 0 and is only written back to its local
cache, and other cores snoop/monitor the change of core 0 from internal
high speed bus and then invalidate their local cacheline where the
blocked variable resides. When other cores read the blocked, they can
get the up-to-date value from the cacheline of core 0. The whole process
is guaranteed by MESI. However, why do we still need memory barriers if
processor HW has cache coherence capability? This is because these CPUs
cannot ensure the order of accessing memory occurring on core 0 is same
as happening on other cores although they can guarantee that data is
consistent. But in our case, as there is no required data dependency,
memory barriers seem useless for us in this case.
Unfortunately life doesn't sound like so easy! There are still some of
old CPUs being incapable of the cache coherence mechanisms, meaning that
manually flushing cache becomes mandatory for them. In this situation,
without memory barriers, the rate of getting stale data might
significantly arise. Even if many packets are involved(ie, many memory
areas would be access via CPU's local cache), in some extremely worst
case it doesn't mean the cacheline storing the stale blocked is
displaced/sacrificed within a short time. Therefore, it becomes
unpredictable for us when the stale blocked would be updated, which may
be unacceptable.
If we find barrier.h files in kernel source tree, below results are got:
*/net-next-2.6$ find ./ -name barrier.h
./arch/metag/include/asm/barrier.h
./arch/sparc/include/asm/barrier.h
./arch/h8300/include/asm/barrier.h
./arch/x86/um/asm/barrier.h
./arch/x86/include/asm/barrier.h
./arch/unicore32/include/asm/barrier.h
./arch/xtensa/include/asm/barrier.h
./arch/frv/include/asm/barrier.h
./arch/m68k/include/asm/barrier.h
./arch/hexagon/include/asm/barrier.h
./arch/mips/include/asm/barrier.h
./arch/arm64/include/asm/barrier.h
./arch/parisc/include/asm/barrier.h
./arch/s390/include/asm/barrier.h
./arch/ia64/include/asm/barrier.h
./arch/m32r/include/asm/barrier.h
./arch/powerpc/include/asm/barrier.h
./arch/alpha/include/asm/barrier.h
./arch/arm/include/asm/barrier.h
./arch/avr32/include/asm/barrier.h
./arch/sh/include/asm/barrier.h
./arch/microblaze/include/asm/barrier.h
./arch/mn10300/include/asm/barrier.h
./arch/tile/include/asm/barrier.h
./arch/arc/include/asm/barrier.h
./arch/score/include/asm/barrier.h
./arch/blackfin/include/asm/barrier.h
./arch/cris/include/asm/barrier.h
./include/asm-generic/barrier.h
Different of architectures have different implementations. Even if arch
is same, different HW versions also have different implementations. If
we take a closer look at them, it's found many of implementations
forcedly flush/sync cache in memory barriers operations.
Beside the effect on CPU cache, memory barriers also can suppress the
optimization of compiler. For it, I just attempt to state my
understanding of keyword - volatile, so please image below scenario:
CPU 0 CPU 1
----------- -----
T0: read blocked(blocked is 0 now)
T1: blocked = 1
T2: read blocked(blocked is 0)
As compiler optimizes the process of accessing blocked, without volatile
keyword decorating the variable of blocked, at T0, CPU 0 reads blocked
from main memory/cache and load it into one of its registers. At T1,
blocked is set to 1 by CPU 1. But as CPU 0 is unaware of the change, it
gets the value from its register instead of main memory or cache when it
reads blocked variable again at T2. We don't know when the value in the
register will be overwritten by other values, that means we don't know
when CPU 0 would read the true data. This may also be unacceptable for us.
Therefore, without memory barriers, in most time I believe everything
works fine. But to be more cautious, I still prefer to apply some
limitations to ensure blocked is always sync.
Of course, probably my above understanding is incomplete or incorrect.
If so, please correct me.
Regards,
Ying
> Regards
> ///jon
>
>
>>
>> Signed-off-by: Ying Xue <yin...@wi...>
>> ---
>> net/tipc/bcast.c | 4 ++--
>> net/tipc/bearer.c | 33 ++++-----------------------------
>> net/tipc/bearer.h | 5 +----
>> net/tipc/discover.c | 5 +++--
>> net/tipc/eth_media.c | 9 ++++-----
>> net/tipc/ib_media.c | 9 ++++-----
>> net/tipc/link.c | 12 ++++++------
>> 7 files changed, 24 insertions(+), 53 deletions(-)
>>
>> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
>> index 716de1a..c78f30c 100644
>> --- a/net/tipc/bcast.c
>> +++ b/net/tipc/bcast.c
>> @@ -615,8 +615,8 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
>> if (!p)
>> break; /* No more bearers to try */
>>
>> - if (tipc_bearer_blocked(p)) {
>> - if (!s || tipc_bearer_blocked(s))
>> + if (atomic_read(&p->blocked)) {
>> + if (!s || atomic_read(&s->blocked))
>> continue; /* Can't use either bearer */
>> b = s;
>> }
>> diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
>> index 3f9707a..17f9824 100644
>> --- a/net/tipc/bearer.c
>> +++ b/net/tipc/bearer.c
>> @@ -275,31 +275,6 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
>> tipc_disc_remove_dest(b_ptr->link_req);
>> }
>>
>> -/*
>> - * Interrupt enabling new requests after bearer blocking:
>> - * See bearer_send().
>> - */
>> -void tipc_continue(struct tipc_bearer *b)
>> -{
>> - spin_lock_bh(&b->lock);
>> - b->blocked = 0;
>> - spin_unlock_bh(&b->lock);
>> -}
>> -
>> -/*
>> - * tipc_bearer_blocked - determines if bearer is currently blocked
>> - */
>> -int tipc_bearer_blocked(struct tipc_bearer *b)
>> -{
>> - int res;
>> -
>> - spin_lock_bh(&b->lock);
>> - res = b->blocked;
>> - spin_unlock_bh(&b->lock);
>> -
>> - return res;
>> -}
>> -
>> /**
>> * tipc_enable_bearer - enable bearer with the given name
>> */
>> @@ -387,6 +362,8 @@ restart:
>>
>> b_ptr = &tipc_bearers[bearer_id];
>> strcpy(b_ptr->name, name);
>> + atomic_set(&b_ptr->blocked, 0);
>> + smp_mb();
>> res = m_ptr->enable_media(b_ptr);
>> if (res) {
>> pr_warn("Bearer <%s> rejected, enable failure (%d)\n",
>> @@ -429,8 +406,8 @@ int tipc_block_bearer(struct tipc_bearer *b_ptr)
>>
>> read_lock_bh(&tipc_net_lock);
>> pr_info("Blocking bearer <%s>\n", b_ptr->name);
>> + atomic_add_unless(&b_ptr->blocked, 1, 1);
>> spin_lock_bh(&b_ptr->lock);
>> - b_ptr->blocked = 1;
>> list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
>> struct tipc_node *n_ptr = l_ptr->owner;
>>
>> @@ -455,8 +432,8 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
>> struct tipc_link_req *temp_req;
>>
>> pr_info("Disabling bearer <%s>\n", b_ptr->name);
>> + atomic_add_unless(&b_ptr->blocked, 1, 1);
>> spin_lock_bh(&b_ptr->lock);
>> - b_ptr->blocked = 1;
>> b_ptr->media->disable_media(b_ptr);
>> list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
>> tipc_link_delete(l_ptr);
>> @@ -489,8 +466,6 @@ int tipc_disable_bearer(const char *name)
>> return res;
>> }
>>
>> -
>> -
>> void tipc_bearer_stop(void)
>> {
>> u32 i;
>> diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
>> index e5e04be..8695d4a 100644
>> --- a/net/tipc/bearer.h
>> +++ b/net/tipc/bearer.h
>> @@ -130,7 +130,7 @@ struct tipc_media {
>> struct tipc_bearer {
>> void *usr_handle; /* initalized by media */
>> u32 mtu; /* initalized by media */
>> - int blocked; /* initalized by media */
>> + atomic_t blocked;
>> struct tipc_media_addr addr; /* initalized by media */
>> char name[TIPC_MAX_BEARER_NAME];
>> spinlock_t lock;
>> @@ -164,8 +164,6 @@ int tipc_register_media(struct tipc_media *m_ptr);
>> void tipc_recv_msg(struct sk_buff *buf, struct tipc_bearer *tb_ptr);
>>
>> int tipc_block_bearer(struct tipc_bearer *b_ptr);
>> -void tipc_continue(struct tipc_bearer *tb_ptr);
>> -
>> int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
>> int tipc_disable_bearer(const char *name);
>>
>> @@ -194,7 +192,6 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
>> struct tipc_bearer *tipc_bearer_find(const char *name);
>> struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
>> struct tipc_media *tipc_media_find(const char *name);
>> -int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
>> void tipc_bearer_stop(void);
>>
>> /**
>> diff --git a/net/tipc/discover.c b/net/tipc/discover.c
>> index ecc758c..d8b49fb 100644
>> --- a/net/tipc/discover.c
>> +++ b/net/tipc/discover.c
>> @@ -239,7 +239,8 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
>> /* Accept discovery message & send response, if necessary */
>> link_fully_up = link_working_working(link);
>>
>> - if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
>> + if ((type == DSC_REQ_MSG) && !link_fully_up &&
>> + !atomic_read(&b_ptr->blocked)) {
>> rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
>> if (rbuf) {
>> tipc_bearer_send(b_ptr, rbuf, &media_addr);
>> @@ -293,7 +294,7 @@ void tipc_disc_remove_dest(struct tipc_link_req *req)
>> */
>> static void disc_send_msg(struct tipc_link_req *req)
>> {
>> - if (!req->bearer->blocked)
>> + if (!atomic_read(&req->bearer->blocked))
>> tipc_bearer_send(req->bearer, req->buf, &req->dest);
>> }
>>
>> diff --git a/net/tipc/eth_media.c b/net/tipc/eth_media.c
>> index f80d59f..32d01e8 100644
>> --- a/net/tipc/eth_media.c
>> +++ b/net/tipc/eth_media.c
>> @@ -1,7 +1,7 @@
>> /*
>> * net/tipc/eth_media.c: Ethernet bearer support for TIPC
>> *
>> - * Copyright (c) 2001-2007, Ericsson AB
>> + * Copyright (c) 2001-2007, 2013, Ericsson AB
>> * Copyright (c) 2005-2008, 2011-2013, Wind River Systems
>> * All rights reserved.
>> *
>> @@ -199,7 +199,6 @@ static int enable_media(struct tipc_bearer *tb_ptr)
>> tb_ptr->bcast_addr.media_id = TIPC_MEDIA_TYPE_ETH;
>> tb_ptr->bcast_addr.broadcast = 1;
>> tb_ptr->mtu = dev->mtu;
>> - tb_ptr->blocked = 0;
>> eth_media_addr_set(tb_ptr, &tb_ptr->addr, (char *)dev->dev_addr);
>> return 0;
>> }
>> @@ -263,12 +262,12 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
>> switch (evt) {
>> case NETDEV_CHANGE:
>> if (netif_carrier_ok(dev))
>> - tipc_continue(eb_ptr->bearer);
>> + atomic_add_unless(&eb_ptr->bearer->blocked, -1, 0);
>> else
>> tipc_block_bearer(eb_ptr->bearer);
>> break;
>> case NETDEV_UP:
>> - tipc_continue(eb_ptr->bearer);
>> + atomic_add_unless(&eb_ptr->bearer->blocked, -1, 0);
>> break;
>> case NETDEV_DOWN:
>> tipc_block_bearer(eb_ptr->bearer);
>> @@ -276,7 +275,7 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
>> case NETDEV_CHANGEMTU:
>> case NETDEV_CHANGEADDR:
>> tipc_block_bearer(eb_ptr->bearer);
>> - tipc_continue(eb_ptr->bearer);
>> + atomic_add_unless(&eb_ptr->bearer->blocked, -1, 0);
>> break;
>> case NETDEV_UNREGISTER:
>> case NETDEV_CHANGENAME:
>> diff --git a/net/tipc/ib_media.c b/net/tipc/ib_media.c
>> index c139892..5241a5b 100644
>> --- a/net/tipc/ib_media.c
>> +++ b/net/tipc/ib_media.c
>> @@ -5,7 +5,7 @@
>> *
>> * Based on eth_media.c, which carries the following copyright notice:
>> *
>> - * Copyright (c) 2001-2007, Ericsson AB
>> + * Copyright (c) 2001-2007, 2013, Ericsson AB
>> * Copyright (c) 2005-2008, 2011, Wind River Systems
>> * All rights reserved.
>> *
>> @@ -192,7 +192,6 @@ static int enable_media(struct tipc_bearer *tb_ptr)
>> tb_ptr->bcast_addr.media_id = TIPC_MEDIA_TYPE_IB;
>> tb_ptr->bcast_addr.broadcast = 1;
>> tb_ptr->mtu = dev->mtu;
>> - tb_ptr->blocked = 0;
>> ib_media_addr_set(tb_ptr, &tb_ptr->addr, (char *)dev->dev_addr);
>> return 0;
>> }
>> @@ -256,12 +255,12 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
>> switch (evt) {
>> case NETDEV_CHANGE:
>> if (netif_carrier_ok(dev))
>> - tipc_continue(ib_ptr->bearer);
>> + atomic_add_unless(&ib_ptr->bearer->blocked, -1, 0);
>> else
>> tipc_block_bearer(ib_ptr->bearer);
>> break;
>> case NETDEV_UP:
>> - tipc_continue(ib_ptr->bearer);
>> + atomic_add_unless(&ib_ptr->bearer->blocked, -1, 0);
>> break;
>> case NETDEV_DOWN:
>> tipc_block_bearer(ib_ptr->bearer);
>> @@ -269,7 +268,7 @@ static int recv_notification(struct notifier_block *nb, unsigned long evt,
>> case NETDEV_CHANGEMTU:
>> case NETDEV_CHANGEADDR:
>> tipc_block_bearer(ib_ptr->bearer);
>> - tipc_continue(ib_ptr->bearer);
>> + atomic_add_unless(&ib_ptr->bearer->blocked, -1, 0);
>> break;
>> case NETDEV_UNREGISTER:
>> case NETDEV_CHANGENAME:
>> diff --git a/net/tipc/link.c b/net/tipc/link.c
>> index 4d1297e..8c0e9a3 100644
>> --- a/net/tipc/link.c
>> +++ b/net/tipc/link.c
>> @@ -796,7 +796,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
>> return link_send_long_buf(l_ptr, buf);
>>
>> /* Packet can be queued or sent. */
>> - if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
>> + if (likely(!atomic_read(&l_ptr->b_ptr->blocked) &&
>> !link_congested(l_ptr))) {
>> link_add_to_outqueue(l_ptr, buf, msg);
>>
>> @@ -963,7 +963,7 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
>>
>> if (likely(!link_congested(l_ptr))) {
>> if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
>> - if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
>> + if (likely(!atomic_read(&l_ptr->b_ptr->blocked))) {
>> link_add_to_outqueue(l_ptr, buf, msg);
>> tipc_bearer_send(l_ptr->b_ptr, buf,
>> &l_ptr->media_addr);
>> @@ -1020,7 +1020,7 @@ exit:
>>
>> /* Exit if link (or bearer) is congested */
>> if (link_congested(l_ptr) ||
>> - tipc_bearer_blocked(l_ptr->b_ptr)) {
>> + atomic_read(&l_ptr->b_ptr->blocked)) {
>> res = link_schedule_port(l_ptr,
>> sender->ref, res);
>> goto exit;
>> @@ -1287,7 +1287,7 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
>> {
>> u32 res;
>>
>> - if (tipc_bearer_blocked(l_ptr->b_ptr))
>> + if (atomic_read(&l_ptr->b_ptr->blocked))
>> return;
>>
>> do {
>> @@ -1376,7 +1376,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
>>
>> msg = buf_msg(buf);
>>
>> - if (tipc_bearer_blocked(l_ptr->b_ptr)) {
>> + if (atomic_read(&l_ptr->b_ptr->blocked)) {
>> if (l_ptr->retransm_queue_size == 0) {
>> l_ptr->retransm_queue_head = msg_seqno(msg);
>> l_ptr->retransm_queue_size = retransmits;
>> @@ -1870,7 +1870,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
>> buf->priority = TC_PRIO_CONTROL;
>>
>> /* Defer message if bearer is already blocked */
>> - if (tipc_bearer_blocked(l_ptr->b_ptr)) {
>> + if (atomic_read(&l_ptr->b_ptr->blocked)) {
>> l_ptr->proto_msg_queue = buf;
>> return;
>> }
>
>
>
|