From: Mark H. <ma...@os...> - 2004-04-15 17:58:50
|
Daniel McNeil and I were playing with multicast in the unstable view and found that it wasn't working for us. We spent some time debugging and got it working (at least in our simple tests). I have enclosed the patch that we came up with for review. We have run the client/server test and it continues to function so I don't think that we broke anything with the change. Please take a look and let us know what you think. Thanks, Mark and Daniel. --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/bcast.h 2004-03-30 07:11:38.000000000 -0800 +++ bcast.h 2004-04-14 14:56:08.000000000 -0700 @@ -123,7 +123,7 @@ int tipc_bsend_buf(struct sk_buff *buf, struct list_head *mc_head); int tipc_send_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *dest,void *buf, uint size); int tipc_forward_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *name, - void *buf,struct tipc_portid const *orig,uint size, + struct sk_buff *buf,struct tipc_portid const *orig,uint size, uint importance,struct list_head *mc_head); --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/reg.h 2004-02-16 15:00:02.000000000 -0800 +++ reg.h 2004-04-15 10:08:28.000000000 -0700 @@ -91,8 +91,19 @@ static inline void * ref_lock_deref(uint ref) { - uint index = ref & ref_table.index_mask; - struct reference *r = &ref_table.entries[index]; + uint index; + struct reference *r; + + index = ref & ref_table.index_mask; + + /* + * Zero is not a valid index + */ + if (!index) { + printk("tipc ref_lock_deref: ref is zero\n"); + return 0; + } + r = &ref_table.entries[index]; spin_lock_bh(&r->lock); if (likely(r->data.reference == ref)) return r->object; --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/sendbcast.c 2004-03-24 08:58:32.000000000 -0800 +++ sendbcast.c 2004-04-15 10:22:54.000000000 -0700 @@ -140,7 +140,7 @@ */ int tipc_forward_buf2nameseq(tipc_ref_t ref, struct tipc_name_seq *name, - void *buf, + struct sk_buff *buf, struct tipc_portid const *orig, uint size, uint importance,struct list_head *mc_head) @@ -156,19 +156,19 @@ m = &this->publ.phdr; if (importance <= 3) msg_set_importance(m,importance); - + prev_destnode = 0; list_for_each(pos,mc_head) { - prev_destnode = 0; mid = list_entry(pos,struct mc_identity,list); if (mid != NULL && (prev_destnode != mid->node)){ prev_destnode = mid->node; - copybuf = buf_acquire(msg_size(m)); - memcpy(copybuf,buf,msg_size(m)); + copybuf = buf_clone(buf); msg_set_destnode(buf_msg(copybuf), mid ->node); - if (likely(mid ->node != tipc_own_addr)) + if (likely(mid ->node != tipc_own_addr)) { res = tipc_send_buf_fast(copybuf,mid->node); - else + } + else { res = bcast_port_recv(copybuf); + } } } return res; @@ -242,6 +242,7 @@ if (!this) return TIPC_FAILURE; + INIT_LIST_HEAD(&mc_head); nametbl_mc_translate(&mc_head, seq->type,seq->lower,seq->upper); tipc_ownidentity(ref,&orig); @@ -255,7 +256,6 @@ res = msg_build(hdr,msg,scnt,TIPC_MAX_MSG_SIZE,&b); if (!b) return TIPC_FAILURE; - count = count_mc_member(&mc_head); if (count <= REPLICA_NODES){ --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/bcast.c 2004-03-30 07:11:38.000000000 -0800 +++ bcast.c 2004-04-15 10:24:09.000000000 -0700 @@ -383,8 +383,8 @@ int i = 0,prev_destnode; struct mc_identity* mid; + prev_destnode = 0; list_for_each(pos,mc_head) { - prev_destnode = 0; mid = list_entry(pos,struct mc_identity,list); if (mid != NULL && (prev_destnode != mid->node)){ prev_destnode = mid->node; @@ -433,6 +433,7 @@ if (mc_list == NULL) return false; + INIT_LIST_HEAD(&mc_list->list); mc_list->port = destport; mc_list->node = destnode; list_add_tail(&mc_list->list,list_head); @@ -492,15 +493,14 @@ void free_mclist(struct list_head *list_head) { struct mc_identity* mid; - struct list_head *pos; + struct list_head *pos, *n; - list_for_each(pos,list_head) { + list_for_each_safe(pos, n, list_head) { mid = list_entry(pos, struct mc_identity, list); list_del(&mid->list); kfree(mid); } - list_del(list_head); } static struct list_head point = LIST_HEAD_INIT(point); --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/link.c 2004-03-11 07:32:51.000000000 -0800 +++ link.c 2004-04-15 10:24:34.000000000 -0700 @@ -1609,7 +1609,11 @@ if (likely(msg_is_dest(msg,tipc_own_addr))){ if (likely(msg_isdata(msg))) { spin_unlock_bh(&this->owner->lock); - port_recv_msg(buf); + if (msg_type(msg) == TIPC_MCAST_MSG) { + bcast_port_recv(buf); + } else { + port_recv_msg(buf); + } continue; } link_recv_non_data_msg(this, buf); --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/cfg.c 2004-02-16 15:00:01.000000000 -0800 +++ cfg.c 2004-04-15 10:27:00.000000000 -0700 @@ -393,7 +393,9 @@ void cfg_link_event(tipc_net_addr_t addr,char* name,int up) { +#ifdef INTER_CLUSTER_COMM struct _zone* z = zone_find(addr); +#endif if (in_own_cluster(addr)) return; --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/recvbcast.c 2004-03-30 07:11:39.000000000 -0800 +++ recvbcast.c 2004-04-15 10:31:18.000000000 -0700 @@ -326,10 +326,10 @@ struct list_head *pos; struct mc_identity *mid; - int res; + int res = TIPC_OK; list_for_each(pos,mc_head) { - mid = list_entry(pos, struct mc_identity, list); + mid = list_entry(pos, struct mc_identity, list); if(mid && mid ->node == tipc_own_addr) { struct port* port =(struct port*) ref_deref(mid ->port); struct sk_buff *copymsg = buf_clone(buf); --- /home/markh/views/tipc/cvs/source/unstable/net/tipc/name_table.c 2004-03-30 07:11:39.000000000 -0800 +++ name_table.c 2004-04-15 10:32:13.000000000 -0700 @@ -299,7 +299,7 @@ /* Insert a publication: */ publ = publ_create(type, lower, upper, port, node, scope, key); - dbg("inserting publ %x, node = %x publ->node = %x, subscr->node\n", + dbg("inserting publ %x, node = %x publ->node = %x, subscr->node %x\n", publ,node,publ->node,publ->subscr.node); if (!sseq->zone_list) sseq->zone_list = publ->zone_list.next = publ; @@ -309,10 +309,11 @@ } if (in_own_cluster(node)){ - if (!sseq->cluster_list) + if (!sseq->cluster_list) { sseq->cluster_list = publ->cluster_list.next = publ; - else{ - publ->cluster_list.next = sseq->cluster_list->cluster_list.next; + } else{ + publ->cluster_list.next = + sseq->cluster_list->cluster_list.next; sseq->cluster_list->cluster_list.next = publ; } } @@ -465,7 +466,7 @@ struct sub_seq *sseq = this->sseqs; if (!sseq) return 0; - dbg("nameseq_av: ff = %u, sseq = %x, &&this->sseqs[this->first_free = %x\n", + dbg("nameseq_av: ff = %u, sseq = %x, &this->sseqs[this]->first_free = %x\n", this->first_free,sseq,&this->sseqs[this->first_free]); for (;sseq != &this->sseqs[this->first_free];sseq++) { if ((sseq->lower >= lower) && (sseq->lower <= upper)) @@ -707,10 +708,14 @@ if (high_seq < low_seq) goto not_found; + + if (high_seq >= seq->first_free) + high_seq = seq->first_free - 1; spin_lock_bh(&seq->lock); i = low_seq; + for (i = low_seq ; i <= high_seq; i++) { @@ -732,14 +737,15 @@ if (destport) { - if ( false == mc_identity_create(mc_head,destport,destnode)) + if ( false == mc_identity_create(mc_head,destport,destnode)) { goto found; + } } } if (list_empty(mc_head)) { - spin_unlock_bh(&seq->lock); - goto not_found; + spin_unlock_bh(&seq->lock); + goto not_found; } found: spin_unlock_bh(&seq->lock); @@ -783,16 +789,18 @@ if (high_seq < low_seq) goto not_found; + if (high_seq >= seq->first_free) + high_seq = seq->first_free - 1; + spin_lock_bh(&seq->lock); i = low_seq; - + for (i = low_seq ; i <= high_seq; i++) { publ = seq->sseqs[i].node_list; if(!publ) { - spin_unlock_bh(&seq->lock); - goto not_found; + continue; } destnode = publ->node; destport = publ->ref; @@ -804,9 +812,10 @@ } } } - if (list_empty(mc_head)) + if (list_empty(mc_head)) { spin_unlock_bh(&seq->lock); goto not_found; + } spin_unlock_bh(&seq->lock); read_unlock_bh(&nametbl_lock); return true; -- Mark Haverkamp <ma...@os...> |
From: Jon M. <jon...@er...> - 2004-04-15 19:20:43
|
It looks ok to me, but I think we should let Guo and Ling have a say first. The only thing I hesitate about is the test for (!index) in "ref_lock_deref". This is one of the most time-critical functions in TIPC. Is it _really_ necessary to do this test, except for pure debugging purposes ? An index of zero should yield the right return value, -zero, and is hence no different from other invalid indexes, which also return zero. => I would prefer that this test is omitted. /Jon Mark Haverkamp wrote: >Daniel McNeil and I were playing with multicast in the unstable view and >found that it wasn't working for us. We spent some time debugging and >got it working (at least in our simple tests). I have enclosed the >patch that we came up with for review. We have run the client/server >test and it continues to function so I don't think that we broke >anything with the change. Please take a look and let us know what you >think. > >Thanks, >Mark and Daniel. > > >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/bcast.h 2004-03-30 07:11:38.000000000 -0800 >+++ bcast.h 2004-04-14 14:56:08.000000000 -0700 >@@ -123,7 +123,7 @@ > int tipc_bsend_buf(struct sk_buff *buf, struct list_head *mc_head); > int tipc_send_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *dest,void *buf, uint size); > int tipc_forward_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *name, >- void *buf,struct tipc_portid const *orig,uint size, >+ struct sk_buff *buf,struct tipc_portid const *orig,uint size, > uint importance,struct list_head *mc_head); > > >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/reg.h 2004-02-16 15:00:02.000000000 -0800 >+++ reg.h 2004-04-15 10:08:28.000000000 -0700 >@@ -91,8 +91,19 @@ > static inline void * > ref_lock_deref(uint ref) > { >- uint index = ref & ref_table.index_mask; >- struct reference *r = &ref_table.entries[index]; >+ uint index; >+ struct reference *r; >+ >+ index = ref & ref_table.index_mask; >+ >+ /* >+ * Zero is not a valid index >+ */ >+ if (!index) { >+ printk("tipc ref_lock_deref: ref is zero\n"); >+ return 0; >+ } >+ r = &ref_table.entries[index]; > spin_lock_bh(&r->lock); > if (likely(r->data.reference == ref)) > return r->object; >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/sendbcast.c 2004-03-24 08:58:32.000000000 -0800 >+++ sendbcast.c 2004-04-15 10:22:54.000000000 -0700 >@@ -140,7 +140,7 @@ > */ > int tipc_forward_buf2nameseq(tipc_ref_t ref, > struct tipc_name_seq *name, >- void *buf, >+ struct sk_buff *buf, > struct tipc_portid const *orig, > uint size, > uint importance,struct list_head *mc_head) >@@ -156,19 +156,19 @@ > m = &this->publ.phdr; > if (importance <= 3) > msg_set_importance(m,importance); >- >+ prev_destnode = 0; > list_for_each(pos,mc_head) { >- prev_destnode = 0; > mid = list_entry(pos,struct mc_identity,list); > if (mid != NULL && (prev_destnode != mid->node)){ > prev_destnode = mid->node; >- copybuf = buf_acquire(msg_size(m)); >- memcpy(copybuf,buf,msg_size(m)); >+ copybuf = buf_clone(buf); > msg_set_destnode(buf_msg(copybuf), mid ->node); >- if (likely(mid ->node != tipc_own_addr)) >+ if (likely(mid ->node != tipc_own_addr)) { > res = tipc_send_buf_fast(copybuf,mid->node); >- else >+ } >+ else { > res = bcast_port_recv(copybuf); >+ } > } > } > return res; >@@ -242,6 +242,7 @@ > > if (!this) > return TIPC_FAILURE; >+ > INIT_LIST_HEAD(&mc_head); > nametbl_mc_translate(&mc_head, seq->type,seq->lower,seq->upper); > tipc_ownidentity(ref,&orig); >@@ -255,7 +256,6 @@ > res = msg_build(hdr,msg,scnt,TIPC_MAX_MSG_SIZE,&b); > if (!b) > return TIPC_FAILURE; >- > count = count_mc_member(&mc_head); > > if (count <= REPLICA_NODES){ >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/bcast.c 2004-03-30 07:11:38.000000000 -0800 >+++ bcast.c 2004-04-15 10:24:09.000000000 -0700 >@@ -383,8 +383,8 @@ > int i = 0,prev_destnode; > struct mc_identity* mid; > >+ prev_destnode = 0; > list_for_each(pos,mc_head) { >- prev_destnode = 0; > mid = list_entry(pos,struct mc_identity,list); > if (mid != NULL && (prev_destnode != mid->node)){ > prev_destnode = mid->node; >@@ -433,6 +433,7 @@ > if (mc_list == NULL) > return false; > >+ INIT_LIST_HEAD(&mc_list->list); > mc_list->port = destport; > mc_list->node = destnode; > list_add_tail(&mc_list->list,list_head); >@@ -492,15 +493,14 @@ > void free_mclist(struct list_head *list_head) > { > struct mc_identity* mid; >- struct list_head *pos; >+ struct list_head *pos, *n; > >- list_for_each(pos,list_head) { >+ list_for_each_safe(pos, n, list_head) { > mid = list_entry(pos, struct mc_identity, list); > list_del(&mid->list); > kfree(mid); > > } >- list_del(list_head); > } > > static struct list_head point = LIST_HEAD_INIT(point); >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/link.c 2004-03-11 07:32:51.000000000 -0800 >+++ link.c 2004-04-15 10:24:34.000000000 -0700 >@@ -1609,7 +1609,11 @@ > if (likely(msg_is_dest(msg,tipc_own_addr))){ > if (likely(msg_isdata(msg))) { > spin_unlock_bh(&this->owner->lock); >- port_recv_msg(buf); >+ if (msg_type(msg) == TIPC_MCAST_MSG) { >+ bcast_port_recv(buf); >+ } else { >+ port_recv_msg(buf); >+ } > continue; > } > link_recv_non_data_msg(this, buf); >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/cfg.c 2004-02-16 15:00:01.000000000 -0800 >+++ cfg.c 2004-04-15 10:27:00.000000000 -0700 >@@ -393,7 +393,9 @@ > > void cfg_link_event(tipc_net_addr_t addr,char* name,int up) > { >+#ifdef INTER_CLUSTER_COMM > struct _zone* z = zone_find(addr); >+#endif > > if (in_own_cluster(addr)) > return; >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/recvbcast.c 2004-03-30 07:11:39.000000000 -0800 >+++ recvbcast.c 2004-04-15 10:31:18.000000000 -0700 >@@ -326,10 +326,10 @@ > > struct list_head *pos; > struct mc_identity *mid; >- int res; >+ int res = TIPC_OK; > > list_for_each(pos,mc_head) { >- mid = list_entry(pos, struct mc_identity, list); >+ mid = list_entry(pos, struct mc_identity, list); > if(mid && mid ->node == tipc_own_addr) { > struct port* port =(struct port*) ref_deref(mid ->port); > struct sk_buff *copymsg = buf_clone(buf); >--- /home/markh/views/tipc/cvs/source/unstable/net/tipc/name_table.c 2004-03-30 07:11:39.000000000 -0800 >+++ name_table.c 2004-04-15 10:32:13.000000000 -0700 >@@ -299,7 +299,7 @@ > /* Insert a publication: */ > > publ = publ_create(type, lower, upper, port, node, scope, key); >- dbg("inserting publ %x, node = %x publ->node = %x, subscr->node\n", >+ dbg("inserting publ %x, node = %x publ->node = %x, subscr->node %x\n", > publ,node,publ->node,publ->subscr.node); > if (!sseq->zone_list) > sseq->zone_list = publ->zone_list.next = publ; >@@ -309,10 +309,11 @@ > } > > if (in_own_cluster(node)){ >- if (!sseq->cluster_list) >+ if (!sseq->cluster_list) { > sseq->cluster_list = publ->cluster_list.next = publ; >- else{ >- publ->cluster_list.next = sseq->cluster_list->cluster_list.next; >+ } else{ >+ publ->cluster_list.next = >+ sseq->cluster_list->cluster_list.next; > sseq->cluster_list->cluster_list.next = publ; > } > } >@@ -465,7 +466,7 @@ > struct sub_seq *sseq = this->sseqs; > if (!sseq) > return 0; >- dbg("nameseq_av: ff = %u, sseq = %x, &&this->sseqs[this->first_free = %x\n", >+ dbg("nameseq_av: ff = %u, sseq = %x, &this->sseqs[this]->first_free = %x\n", > this->first_free,sseq,&this->sseqs[this->first_free]); > for (;sseq != &this->sseqs[this->first_free];sseq++) { > if ((sseq->lower >= lower) && (sseq->lower <= upper)) >@@ -707,10 +708,14 @@ > > if (high_seq < low_seq) > goto not_found; >+ >+ if (high_seq >= seq->first_free) >+ high_seq = seq->first_free - 1; > > spin_lock_bh(&seq->lock); > > i = low_seq; >+ > > for (i = low_seq ; i <= high_seq; i++) > { >@@ -732,14 +737,15 @@ > > if (destport) > { >- if ( false == mc_identity_create(mc_head,destport,destnode)) >+ if ( false == mc_identity_create(mc_head,destport,destnode)) { > goto found; >+ } > } > } > if (list_empty(mc_head)) > { >- spin_unlock_bh(&seq->lock); >- goto not_found; >+ spin_unlock_bh(&seq->lock); >+ goto not_found; > } > found: > spin_unlock_bh(&seq->lock); >@@ -783,16 +789,18 @@ > if (high_seq < low_seq) > goto not_found; > >+ if (high_seq >= seq->first_free) >+ high_seq = seq->first_free - 1; >+ > spin_lock_bh(&seq->lock); > > i = low_seq; >- >+ > for (i = low_seq ; i <= high_seq; i++) > { > publ = seq->sseqs[i].node_list; > if(!publ) { >- spin_unlock_bh(&seq->lock); >- goto not_found; >+ continue; > } > destnode = publ->node; > destport = publ->ref; >@@ -804,9 +812,10 @@ > } > } > } >- if (list_empty(mc_head)) >+ if (list_empty(mc_head)) { > spin_unlock_bh(&seq->lock); > goto not_found; >+ } > spin_unlock_bh(&seq->lock); > read_unlock_bh(&nametbl_lock); > return true; > > > |
From: Mark H. <ma...@os...> - 2004-04-15 20:44:43
|
On Thu, 2004-04-15 at 12:20, Jon Maloy wrote: > It looks ok to me, but I think we should let Guo and Ling have a > say first. I was hoping that they look at it. > > The only thing I hesitate about is the test for (!index) in > "ref_lock_deref". > This is one of the most time-critical functions in TIPC. > Is it _really_ necessary to do this test, except for pure > debugging purposes ? An index of zero should yield the right > return value, -zero, and is hence no different from other invalid > indexes, which also return zero. > => I would prefer that this test is omitted. OK, the problem was that the lock in table[0] wasn't initialized causing the spinlock code to BUG(). I initialized the lock and took out the index check. See attached Mark. Index: bcast.h =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/bcast.h,v retrieving revision 1.9 diff -u -r1.9 bcast.h --- bcast.h 30 Mar 2004 03:33:16 -0000 1.9 +++ bcast.h 15 Apr 2004 20:42:45 -0000 @@ -123,7 +123,7 @@ int tipc_bsend_buf(struct sk_buff *buf, struct list_head *mc_head); int tipc_send_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *dest,void *buf, uint size); int tipc_forward_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *name, - void *buf,struct tipc_portid const *orig,uint size, + struct sk_buff *buf,struct tipc_portid const *orig,uint size, uint importance,struct list_head *mc_head); Index: reg.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/reg.c,v retrieving revision 1.6 diff -u -r1.6 reg.c --- reg.c 16 Feb 2004 23:00:02 -0000 1.6 +++ reg.c 15 Apr 2004 20:42:45 -0000 @@ -100,6 +100,7 @@ table = (struct reference *) k_malloc(sizeof (struct reference) * sz); table[0].object = 0; table[0].data.reference = ~0u; + table[0].lock = SPIN_LOCK_UNLOCKED; for (i = 1; i < sz - 1; i++) { table[i].object = 0; table[i].lock = SPIN_LOCK_UNLOCKED; Index: sendbcast.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/sendbcast.c,v retrieving revision 1.10 diff -u -r1.10 sendbcast.c --- sendbcast.c 16 Mar 2004 07:16:07 -0000 1.10 +++ sendbcast.c 15 Apr 2004 20:42:45 -0000 @@ -140,7 +140,7 @@ */ int tipc_forward_buf2nameseq(tipc_ref_t ref, struct tipc_name_seq *name, - void *buf, + struct sk_buff *buf, struct tipc_portid const *orig, uint size, uint importance,struct list_head *mc_head) @@ -156,19 +156,19 @@ m = &this->publ.phdr; if (importance <= 3) msg_set_importance(m,importance); - + prev_destnode = 0; list_for_each(pos,mc_head) { - prev_destnode = 0; mid = list_entry(pos,struct mc_identity,list); if (mid != NULL && (prev_destnode != mid->node)){ prev_destnode = mid->node; - copybuf = buf_acquire(msg_size(m)); - memcpy(copybuf,buf,msg_size(m)); + copybuf = buf_clone(buf); msg_set_destnode(buf_msg(copybuf), mid ->node); - if (likely(mid ->node != tipc_own_addr)) + if (likely(mid ->node != tipc_own_addr)) { res = tipc_send_buf_fast(copybuf,mid->node); - else + } + else { res = bcast_port_recv(copybuf); + } } } return res; @@ -242,6 +242,7 @@ if (!this) return TIPC_FAILURE; + INIT_LIST_HEAD(&mc_head); nametbl_mc_translate(&mc_head, seq->type,seq->lower,seq->upper); tipc_ownidentity(ref,&orig); @@ -255,7 +256,6 @@ res = msg_build(hdr,msg,scnt,TIPC_MAX_MSG_SIZE,&b); if (!b) return TIPC_FAILURE; - count = count_mc_member(&mc_head); if (count <= REPLICA_NODES){ Index: bcast.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/bcast.c,v retrieving revision 1.13 diff -u -r1.13 bcast.c --- bcast.c 30 Mar 2004 03:33:16 -0000 1.13 +++ bcast.c 15 Apr 2004 20:42:45 -0000 @@ -383,8 +383,8 @@ int i = 0,prev_destnode; struct mc_identity* mid; + prev_destnode = 0; list_for_each(pos,mc_head) { - prev_destnode = 0; mid = list_entry(pos,struct mc_identity,list); if (mid != NULL && (prev_destnode != mid->node)){ prev_destnode = mid->node; @@ -433,6 +433,7 @@ if (mc_list == NULL) return false; + INIT_LIST_HEAD(&mc_list->list); mc_list->port = destport; mc_list->node = destnode; list_add_tail(&mc_list->list,list_head); @@ -492,15 +493,14 @@ void free_mclist(struct list_head *list_head) { struct mc_identity* mid; - struct list_head *pos; + struct list_head *pos, *n; - list_for_each(pos,list_head) { + list_for_each_safe(pos, n, list_head) { mid = list_entry(pos, struct mc_identity, list); list_del(&mid->list); kfree(mid); } - list_del(list_head); } static struct list_head point = LIST_HEAD_INIT(point); Index: link.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/link.c,v retrieving revision 1.12 diff -u -r1.12 link.c --- link.c 15 Apr 2004 17:44:13 -0000 1.12 +++ link.c 15 Apr 2004 20:42:46 -0000 @@ -1612,7 +1612,11 @@ if (likely(msg_is_dest(msg,tipc_own_addr))){ if (likely(msg_isdata(msg))) { spin_unlock_bh(&this->owner->lock); - port_recv_msg(buf); + if (msg_type(msg) == TIPC_MCAST_MSG) { + bcast_port_recv(buf); + } else { + port_recv_msg(buf); + } continue; } link_recv_non_data_msg(this, buf); Index: cfg.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/cfg.c,v retrieving revision 1.6 diff -u -r1.6 cfg.c --- cfg.c 16 Feb 2004 23:00:01 -0000 1.6 +++ cfg.c 15 Apr 2004 20:42:47 -0000 @@ -393,7 +393,9 @@ void cfg_link_event(tipc_net_addr_t addr,char* name,int up) { +#ifdef INTER_CLUSTER_COMM struct _zone* z = zone_find(addr); +#endif if (in_own_cluster(addr)) return; Index: recvbcast.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/recvbcast.c,v retrieving revision 1.10 diff -u -r1.10 recvbcast.c --- recvbcast.c 30 Mar 2004 03:33:16 -0000 1.10 +++ recvbcast.c 15 Apr 2004 20:42:47 -0000 @@ -326,10 +326,10 @@ struct list_head *pos; struct mc_identity *mid; - int res; + int res = TIPC_OK; list_for_each(pos,mc_head) { - mid = list_entry(pos, struct mc_identity, list); + mid = list_entry(pos, struct mc_identity, list); if(mid && mid ->node == tipc_own_addr) { struct port* port =(struct port*) ref_deref(mid ->port); struct sk_buff *copymsg = buf_clone(buf); Index: name_table.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/name_table.c,v retrieving revision 1.7 diff -u -r1.7 name_table.c --- name_table.c 30 Mar 2004 03:33:16 -0000 1.7 +++ name_table.c 15 Apr 2004 20:42:47 -0000 @@ -299,7 +299,7 @@ /* Insert a publication: */ publ = publ_create(type, lower, upper, port, node, scope, key); - dbg("inserting publ %x, node = %x publ->node = %x, subscr->node\n", + dbg("inserting publ %x, node = %x publ->node = %x, subscr->node %x\n", publ,node,publ->node,publ->subscr.node); if (!sseq->zone_list) sseq->zone_list = publ->zone_list.next = publ; @@ -309,10 +309,11 @@ } if (in_own_cluster(node)){ - if (!sseq->cluster_list) + if (!sseq->cluster_list) { sseq->cluster_list = publ->cluster_list.next = publ; - else{ - publ->cluster_list.next = sseq->cluster_list->cluster_list.next; + } else{ + publ->cluster_list.next = + sseq->cluster_list->cluster_list.next; sseq->cluster_list->cluster_list.next = publ; } } @@ -465,7 +466,7 @@ struct sub_seq *sseq = this->sseqs; if (!sseq) return 0; - dbg("nameseq_av: ff = %u, sseq = %x, &&this->sseqs[this->first_free = %x\n", + dbg("nameseq_av: ff = %u, sseq = %x, &this->sseqs[this]->first_free = %x\n", this->first_free,sseq,&this->sseqs[this->first_free]); for (;sseq != &this->sseqs[this->first_free];sseq++) { if ((sseq->lower >= lower) && (sseq->lower <= upper)) @@ -707,10 +708,14 @@ if (high_seq < low_seq) goto not_found; + + if (high_seq >= seq->first_free) + high_seq = seq->first_free - 1; spin_lock_bh(&seq->lock); i = low_seq; + for (i = low_seq ; i <= high_seq; i++) { @@ -732,14 +737,15 @@ if (destport) { - if ( false == mc_identity_create(mc_head,destport,destnode)) + if ( false == mc_identity_create(mc_head,destport,destnode)) { goto found; + } } } if (list_empty(mc_head)) { - spin_unlock_bh(&seq->lock); - goto not_found; + spin_unlock_bh(&seq->lock); + goto not_found; } found: spin_unlock_bh(&seq->lock); @@ -783,16 +789,18 @@ if (high_seq < low_seq) goto not_found; + if (high_seq >= seq->first_free) + high_seq = seq->first_free - 1; + spin_lock_bh(&seq->lock); i = low_seq; - + for (i = low_seq ; i <= high_seq; i++) { publ = seq->sseqs[i].node_list; if(!publ) { - spin_unlock_bh(&seq->lock); - goto not_found; + continue; } destnode = publ->node; destport = publ->ref; @@ -804,9 +812,10 @@ } } } - if (list_empty(mc_head)) + if (list_empty(mc_head)) { spin_unlock_bh(&seq->lock); goto not_found; + } spin_unlock_bh(&seq->lock); read_unlock_bh(&nametbl_lock); return true; -- Mark Haverkamp <ma...@os...> |
From: Jon M. <jon...@er...> - 2004-04-15 21:54:58
|
Good. I hope we will soon have some comements from Guo and Ling about this and about their progress relating to the broadcast protocol in general. Cheers /jon Mark Haverkamp wrote: On Thu, 2004-04-15 at 12:20, Jon Maloy wrote: It looks ok to me, but I think we should let Guo and Ling have a say first. I was hoping that they look at it. The only thing I hesitate about is the test for (!index) in "ref_lock_deref". This is one of the most time-critical functions in TIPC. Is it _really_ necessary to do this test, except for pure debugging purposes ? An index of zero should yield the right return value, -zero, and is hence no different from other invalid indexes, which also return zero. => I would prefer that this test is omitted. OK, the problem was that the lock in table[0] wasn't initialized causing the spinlock code to BUG(). I initialized the lock and took out the index check. See attached Mark. Index: bcast.h =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/bcast.h,v retrieving revision 1.9 diff -u -r1.9 bcast.h --- bcast.h 30 Mar 2004 03:33:16 -0000 1.9 +++ bcast.h 15 Apr 2004 20:42:45 -0000 @@ -123,7 +123,7 @@ int tipc_bsend_buf(struct sk_buff *buf, struct list_head *mc_head); int tipc_send_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *dest,void *buf, uint size); int tipc_forward_buf2nameseq(tipc_ref_t ref,struct tipc_name_seq *name, - void *buf,struct tipc_portid const *orig,uint size, + struct sk_buff *buf,struct tipc_portid const *orig,uint size, uint importance,struct list_head *mc_head); Index: reg.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/reg.c,v retrieving revision 1.6 diff -u -r1.6 reg.c --- reg.c 16 Feb 2004 23:00:02 -0000 1.6 +++ reg.c 15 Apr 2004 20:42:45 -0000 @@ -100,6 +100,7 @@ table = (struct reference *) k_malloc(sizeof (struct reference) * sz); table[0].object = 0; table[0].data.reference = ~0u; + table[0].lock = SPIN_LOCK_UNLOCKED; for (i = 1; i < sz - 1; i++) { table[i].object = 0; table[i].lock = SPIN_LOCK_UNLOCKED; Index: sendbcast.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/sendbcast.c,v retrieving revision 1.10 diff -u -r1.10 sendbcast.c --- sendbcast.c 16 Mar 2004 07:16:07 -0000 1.10 +++ sendbcast.c 15 Apr 2004 20:42:45 -0000 @@ -140,7 +140,7 @@ */ int tipc_forward_buf2nameseq(tipc_ref_t ref, struct tipc_name_seq *name, - void *buf, + struct sk_buff *buf, struct tipc_portid const *orig, uint size, uint importance,struct list_head *mc_head) @@ -156,19 +156,19 @@ m = &this->publ.phdr; if (importance <= 3) msg_set_importance(m,importance); - + prev_destnode = 0; list_for_each(pos,mc_head) { - prev_destnode = 0; mid = list_entry(pos,struct mc_identity,list); if (mid != NULL && (prev_destnode != mid->node)){ prev_destnode = mid->node; - copybuf = buf_acquire(msg_size(m)); - memcpy(copybuf,buf,msg_size(m)); + copybuf = buf_clone(buf); msg_set_destnode(buf_msg(copybuf), mid ->node); - if (likely(mid ->node != tipc_own_addr)) + if (likely(mid ->node != tipc_own_addr)) { res = tipc_send_buf_fast(copybuf,mid->node); - else + } + else { res = bcast_port_recv(copybuf); + } } } return res; @@ -242,6 +242,7 @@ if (!this) return TIPC_FAILURE; + INIT_LIST_HEAD(&mc_head); nametbl_mc_translate(&mc_head, seq->type,seq->lower,seq->upper); tipc_ownidentity(ref,&orig); @@ -255,7 +256,6 @@ res = msg_build(hdr,msg,scnt,TIPC_MAX_MSG_SIZE,&b); if (!b) return TIPC_FAILURE; - count = count_mc_member(&mc_head); if (count <= REPLICA_NODES){ Index: bcast.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/bcast.c,v retrieving revision 1.13 diff -u -r1.13 bcast.c --- bcast.c 30 Mar 2004 03:33:16 -0000 1.13 +++ bcast.c 15 Apr 2004 20:42:45 -0000 @@ -383,8 +383,8 @@ int i = 0,prev_destnode; struct mc_identity* mid; + prev_destnode = 0; list_for_each(pos,mc_head) { - prev_destnode = 0; mid = list_entry(pos,struct mc_identity,list); if (mid != NULL && (prev_destnode != mid->node)){ prev_destnode = mid->node; @@ -433,6 +433,7 @@ if (mc_list == NULL) return false; + INIT_LIST_HEAD(&mc_list->list); mc_list->port = destport; mc_list->node = destnode; list_add_tail(&mc_list->list,list_head); @@ -492,15 +493,14 @@ void free_mclist(struct list_head *list_head) { struct mc_identity* mid; - struct list_head *pos; + struct list_head *pos, *n; - list_for_each(pos,list_head) { + list_for_each_safe(pos, n, list_head) { mid = list_entry(pos, struct mc_identity, list); list_del(&mid->list); kfree(mid); } - list_del(list_head); } static struct list_head point = LIST_HEAD_INIT(point); Index: link.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/link.c,v retrieving revision 1.12 diff -u -r1.12 link.c --- link.c 15 Apr 2004 17:44:13 -0000 1.12 +++ link.c 15 Apr 2004 20:42:46 -0000 @@ -1612,7 +1612,11 @@ if (likely(msg_is_dest(msg,tipc_own_addr))){ if (likely(msg_isdata(msg))) { spin_unlock_bh(&this->owner->lock); - port_recv_msg(buf); + if (msg_type(msg) == TIPC_MCAST_MSG) { + bcast_port_recv(buf); + } else { + port_recv_msg(buf); + } continue; } link_recv_non_data_msg(this, buf); Index: cfg.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/cfg.c,v retrieving revision 1.6 diff -u -r1.6 cfg.c --- cfg.c 16 Feb 2004 23:00:01 -0000 1.6 +++ cfg.c 15 Apr 2004 20:42:47 -0000 @@ -393,7 +393,9 @@ void cfg_link_event(tipc_net_addr_t addr,char* name,int up) { +#ifdef INTER_CLUSTER_COMM struct _zone* z = zone_find(addr); +#endif if (in_own_cluster(addr)) return; Index: recvbcast.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/recvbcast.c,v retrieving revision 1.10 diff -u -r1.10 recvbcast.c --- recvbcast.c 30 Mar 2004 03:33:16 -0000 1.10 +++ recvbcast.c 15 Apr 2004 20:42:47 -0000 @@ -326,10 +326,10 @@ struct list_head *pos; struct mc_identity *mid; - int res; + int res = TIPC_OK; list_for_each(pos,mc_head) { - mid = list_entry(pos, struct mc_identity, list); + mid = list_entry(pos, struct mc_identity, list); if(mid && mid ->node == tipc_own_addr) { struct port* port =(struct port*) ref_deref(mid ->port); struct sk_buff *copymsg = buf_clone(buf); Index: name_table.c =================================================================== RCS file: /cvsroot/tipc/source/unstable/net/tipc/name_table.c,v retrieving revision 1.7 diff -u -r1.7 name_table.c --- name_table.c 30 Mar 2004 03:33:16 -0000 1.7 +++ name_table.c 15 Apr 2004 20:42:47 -0000 @@ -299,7 +299,7 @@ /* Insert a publication: */ publ = publ_create(type, lower, upper, port, node, scope, key); - dbg("inserting publ %x, node = %x publ->node = %x, subscr->node\n", + dbg("inserting publ %x, node = %x publ->node = %x, subscr->node %x\n", publ,node,publ->node,publ->subscr.node); if (!sseq->zone_list) sseq->zone_list = publ->zone_list.next = publ; @@ -309,10 +309,11 @@ } if (in_own_cluster(node)){ - if (!sseq->cluster_list) + if (!sseq->cluster_list) { sseq->cluster_list = publ->cluster_list.next = publ; - else{ - publ->cluster_list.next = sseq->cluster_list->cluster_list.next; + } else{ + publ->cluster_list.next = + sseq->cluster_list->cluster_list.next; sseq->cluster_list->cluster_list.next = publ; } } @@ -465,7 +466,7 @@ struct sub_seq *sseq = this->sseqs; if (!sseq) return 0; - dbg("nameseq_av: ff = %u, sseq = %x, &&this->sseqs[this->first_free = %x\n", + dbg("nameseq_av: ff = %u, sseq = %x, &this->sseqs[this]->first_free = %x\n", this->first_free,sseq,&this->sseqs[this->first_free]); for (;sseq != &this->sseqs[this->first_free];sseq++) { if ((sseq->lower >= lower) && (sseq->lower <= upper)) @@ -707,10 +708,14 @@ if (high_seq < low_seq) goto not_found; + + if (high_seq >= seq->first_free) + high_seq = seq->first_free - 1; spin_lock_bh(&seq->lock); i = low_seq; + for (i = low_seq ; i <= high_seq; i++) { @@ -732,14 +737,15 @@ if (destport) { - if ( false == mc_identity_create(mc_head,destport,destnode)) + if ( false == mc_identity_create(mc_head,destport,destnode)) { goto found; + } } } if (list_empty(mc_head)) { - spin_unlock_bh(&seq->lock); - goto not_found; + spin_unlock_bh(&seq->lock); + goto not_found; } found: spin_unlock_bh(&seq->lock); @@ -783,16 +789,18 @@ if (high_seq < low_seq) goto not_found; + if (high_seq >= seq->first_free) + high_seq = seq->first_free - 1; + spin_lock_bh(&seq->lock); i = low_seq; - + for (i = low_seq ; i <= high_seq; i++) { publ = seq->sseqs[i].node_list; if(!publ) { - spin_unlock_bh(&seq->lock); - goto not_found; + continue; } destnode = publ->node; destport = publ->ref; @@ -804,9 +812,10 @@ } } } - if (list_empty(mc_head)) + if (list_empty(mc_head)) { spin_unlock_bh(&seq->lock); goto not_found; + } spin_unlock_bh(&seq->lock); read_unlock_bh(&nametbl_lock); return true; |