From: Guo, M. <mi...@in...> - 2004-04-21 03:40:54
|
the tipc retransmit patch for RM. Index: recvbcast.c =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D RCS file: /cvsroot/tipc/source/unstable/net/tipc/recvbcast.c,v retrieving revision 1.11 retrieving revision 1.14 diff -u -r1.11 -r1.14 --- recvbcast.c 16 Apr 2004 04:03:46 -0000 1.11 +++ recvbcast.c 21 Apr 2004 02:34:41 -0000 1.14 @@ -166,7 +179,7 @@ * Input: ackno: the acked packet seqno * Return: void */ -static void bnode_outqueue_release(int ackno) +void bnode_outqueue_release(int ackno) { struct sk_buff *buf; =09 @@ -295,34 +308,30 @@ exp_seq =3D owner->last_in_bcast + 1;=09 defered_size =3D owner->deferred_inqueue_sz; gap =3D owner->gap;=09 - dbg("-------recv bcast data seqno %d\n",msg_seqno(msg));=09 - if(exp_seq !=3D msg_seqno(msg)) { - dbg("recv bcast data exp_seq is %d\n",exp_seq); - dbg("recv bcast data seqno %d\n",msg_seqno(msg));=09 - buf_safe_discard(buf); - return; =09 - } + printk("-------recv bcast data seqno %d\n",msg_seqno(msg));=09 + printk("recv bcast data exp_seq is %d\n",exp_seq); + printk("recv bcast data seqno %d\n",msg_seqno(msg));=09 =09 if (msg_seqno(msg) =3D=3D exp_seq) { spin_lock_bh(&owner->lock);=09 owner->last_in_bcast =3D exp_seq; spin_unlock_bh(&owner->lock);=09 - exp_seq++; if(gap !=3D 0) { recaculate_gap(this); } bcast_port_recv(buf); =09 - //if (match_address_4(msg, tipc_own_addr)) - request_retransmit(this, gap); + if (match_address_4(msg, tipc_own_addr)) + request_retransmit(this, gap); =09 } - /*else if(msg_seqno(msg) < exp_seq) { + else if(msg_seqno(msg) < exp_seq) { dbg("discard duplicate message, seq:%d", msg_seqno(msg)); buf_discard(buf); return;=09 =09 }else{ + return; if(bcast_deferred_queue_add(this,buf)) { recaculate_gap(this); if(match_address_3(msg, tipc_own_addr) && defered_size =3D=3D 1 ) {=20 @@ -331,7 +340,7 @@ request_retransmit(this, gap); } } - }*/=09 + }=09 =20 } /** @@ -344,11 +353,15 @@ static void recv_bcast_restransmit(struct link* this,struct sk_buff *buf) { struct sk_buff *obuf; + struct tipc_msg *msg; uint pos =3D INT_H_SIZE;=09 struct node* owner; - + printk("recv_bcast_retransmit\n"); owner =3D this->owner; obuf =3D buf_extract(buf, pos);=09 + printk("buf_extract"); + msg =3D buf_msg(obuf); + printk("the size is %d\n", msg_hdr_sz(msg)); recv_bcast_data(this, obuf); buf_safe_discard(buf); } @@ -370,7 +383,6 @@ int ackno =3D msg_bcast_ack(buf_msg(buf)); int gap; dbg("---recv bcast state ack no=3D %d\n ",ackno); -// bnode_outqueue_release(ackno); this->owner->acked_bcast =3D ackno; btlink =3D find_blink(this->bearer);=09 if (btlink) @@ -450,7 +462,7 @@ for (crs =3D &nodes;*crs;crs =3D &(*crs)->next) { if (!in_own_cluster((*crs)->addr))=20 continue; -// dbg("the crs's acked number %d\n",(*crs) ->acked_bcast); + if ( less((*crs) ->acked_bcast, min_acked_bcast)) min_acked_bcast =3D (*crs) ->acked_bcast; =09 @@ -490,8 +502,6 @@ recv_bcast_restransmit(this,buf); break; case STATE_MSG: - dbg("BCAST_MSG"); - tipc_dump_head(msg); recv_bcast_state(this,buf); break; =09 default: @@ -519,7 +529,7 @@ if (false =3D=3D nametbl_self_translate(&mc_head,msg_nametype(msg),\ msg_namelower(msg),msg_nameupper(msg))) { buf_safe_discard(buf); - dbg("get TIPC_ERR_NO_PORT\n"); =09 + printk("get TIPC_ERR_NO_PORT\n"); return TIPC_ERR_NO_PORT; } res =3D nameseq_deliver(buf,&mc_head); Index: bcast.c =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D RCS file: /cvsroot/tipc/source/unstable/net/tipc/bcast.c,v retrieving revision 1.14 retrieving revision 1.18 diff -u -r1.14 -r1.18 --- bcast.c 16 Apr 2004 04:03:46 -0000 1.14 +++ bcast.c 21 Apr 2004 02:34:41 -0000 1.18 @@ -136,6 +152,9 @@ =20 for (;i<MAX_BEARERS;i++) linkset[i] =3D NULL; + for(i=3D0;i<MAX_NODES;i++) + retrans_nodes[i] =3D NULL; + } =20 =20 @@ -310,7 +329,7 @@ blink->bitmap[i] =3D 0; INIT_LIST_HEAD(&blink->list); blink->mask =3D (tipc_addr(tipc_zone(tipc_own_addr),tipc_cluster(tipc_own_addr),0));=09 - memset(blink->bitmap, 0, MAX_NODES*sizeof(unsigned long)); + memset(blink->bitmap, 0, sizeof(blink->bitmap)); list_add_tail(&(blink->list),&blink_head); return blink; =09 @@ -366,7 +385,7 @@ buf_set_next(buf,tbuf); node->deferred_in =3D buf;=09 node->deferred_inqueue_sz++; - }else if(msg_seqno(buf_msg(tbuf)) =3D=3D msg_seqno(buf_msg(buf))){ + }else if(msg_seqno(buf_msg(pbuf)) =3D=3D msg_seqno(buf_msg(buf))){ buf_safe_discard(buf); return false; }else{ @@ -427,8 +446,8 @@ int i =3D 0,prev_destnode;=09 struct mc_identity* mid; =09 + prev_destnode =3D 0; list_for_each(pos,mc_head) { - prev_destnode =3D 0; mid =3D list_entry(pos,struct mc_identity,list); if (mid !=3D NULL && (prev_destnode !=3D mid->node)){ prev_destnode =3D mid->node; @@ -795,7 +814,6 @@ void bcast_recv_timeout() { - return; send_timer_ref =3D 0; check_bcast_outqueue(); timeout_retransmit(); @@ -832,59 +850,59 @@ =20 void datamsghead_dump(struct tipc_msg * p) { - dbg("Version:%d\n", msg_version(p)); - dbg( "User:%d \n", msg_user(p)); - dbg( "Headsize:%d\n", msg_hdr_sz(p)); - dbg( "Message Size:%d\n", msg_size(p)); - dbg( "Link level ack Seq no:%d\n", msg_ack(p)); - dbg( "broadcast/link level seq no:%d\n", msg_bcast_ack(p)); - dbg( "Send Seq:%d\n", msg_seqno(p)); - dbg( "Previsous processor:%d\n", tipc_node(msg_prevnode(p))); - dbg( "Orignate Port:%X\n", msg_origport(p)); - dbg( "Desting Port:%X\n", msg_destport(p)); + printk("Version:%d\n", msg_version(p)); + printk( "User:%d \n", msg_user(p)); + printk( "Headsize:%d\n", msg_hdr_sz(p)); + printk( "Message Size:%d\n", msg_size(p)); + printk( "Link level ack Seq no:%d\n", msg_ack(p)); + printk( "broadcast/link level seq no:%d\n", msg_bcast_ack(p)); + printk( "Send Seq:%d\n", msg_seqno(p)); + printk( "Previsous processor:%d\n", tipc_node(msg_prevnode(p))); + printk( "Orignate Port:%X\n", msg_origport(p)); + printk( "Desting Port:%X\n", msg_destport(p)); if (msg_hdr_sz(p) <=3D 24) return; - dbg( "Message Type:%d\n", msg_type(p)); - dbg( "Error Code:%d\n", msg_errcode(p)); - dbg( "Reroute Count:%d\n", msg_reroute_cnt(p)); - dbg( "Orignate Processor:%d\n", tipc_node(msg_orignode(p))); - dbg( "Destination Processor:%d\n", tipc_node(msg_destnode(p))); + printk( "Message Type:%d\n", msg_type(p)); + printk( "Error Code:%d\n", msg_errcode(p)); + printk( "Reroute Count:%d\n", msg_reroute_cnt(p)); + printk( "Orignate Processor:%d\n", tipc_node(msg_orignode(p))); + printk( "Destination Processor:%d\n", tipc_node(msg_destnode(p))); if (msg_hdr_sz(p) <=3D 32) return; - dbg( "Port Name Type:%d\n", msg_nametype(p)); - dbg( "Port Name Instance:%d\n", msg_namelower(p)); + printk( "Port Name Type:%d\n", msg_nametype(p)); + printk( "Port Name Instance:%d\n", msg_namelower(p)); if (msg_hdr_sz(p) <=3D 40) return; - dbg( "Port Name Instance:%d\n", msg_nameupper(p)); + printk( "Port Name Instance:%d\n", msg_nameupper(p)); } =20 =20 void intermsghead_dump(struct tipc_msg* p) { - dbg( "Version:%d\n", msg_version(p)); - dbg( "User:%d \n", msg_user(p)); - dbg( "Headsize:%d\n", msg_hdr_sz(p)); - dbg( "Broadcast ack no:%d\n", msg_bcast_ack(p)); - dbg( "Message Size:%d\n", msg_size(p)); - dbg( "Link Ack Seq:%d\n", msg_ack(p)); - dbg( "Send Seq:%d\n", msg_seqno(p)); - dbg( "Previsous processor:%d\n",tipc_node(msg_prevnode(p))); - dbg( "importance :%d\n", msg_importance(p)); - dbg( "Link selector :%d\n", msg_link_selector(p)); - dbg( "Message Count :%d\n", msg_msgcnt(p)); - dbg( "Probe:%d\n", msg_probe(p)); - dbg( "Bearer identity:%d\n", msg_bearer_id(p)); - dbg( "Link selector :%d\n", msg_link_selector(p)); + printk( "Version:%d\n", msg_version(p)); + printk( "User:%d \n", msg_user(p)); + printk( "Headsize:%d\n", msg_hdr_sz(p)); + printk( "Broadcast ack no:%d\n", msg_bcast_ack(p)); + printk( "Message Size:%d\n", msg_size(p)); + printk( "Link Ack Seq:%d\n", msg_ack(p)); + printk( "Send Seq:%d\n", msg_seqno(p)); + printk( "Previsous processor:%d\n",tipc_node(msg_prevnode(p))); + printk( "importance :%d\n", msg_importance(p)); + printk( "Link selector :%d\n", msg_link_selector(p)); + printk( "Message Count :%d\n", msg_msgcnt(p)); + printk( "Probe:%d\n", msg_probe(p)); + printk( "Bearer identity:%d\n", msg_bearer_id(p)); + printk( "Link selector :%d\n", msg_link_selector(p)); if (msg_hdr_sz(p) < 20) return; - dbg( "Message Type:%d\n", msg_type(p)); - dbg( "Sequence Gap:%d\n", msg_seq_gap(p)); =20 - dbg( "Next Send Packet:%d\n", msg_next_bcast(p)); - dbg( "Next Send Packet:%d\n", msg_next_sent(p)); + printk( "Message Type:%d\n", msg_type(p)); + printk( "Sequence Gap:%d\n", msg_seq_gap(p)); =20 + printk( "Next Send Packet:%d\n", msg_next_bcast(p)); + printk( "Next Send Packet:%d\n", msg_next_sent(p)); if (msg_hdr_sz(p) <=3D 32 ) return; - dbg( "Orignate Processor:%d\n", tipc_node(msg_orignode(p))); - dbg( "Destination Processor:%d\n", tipc_node(msg_destnode(p))); + printk( "Orignate Processor:%d\n", tipc_node(msg_orignode(p))); + printk( "Destination Processor:%d\n", tipc_node(msg_destnode(p))); =20 } void tipc_dump_head(struct tipc_msg *pmsg) Index: sendbcast.c =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D RCS file: /cvsroot/tipc/source/unstable/net/tipc/sendbcast.c,v retrieving revision 1.11 retrieving revision 1.12 diff -u -r1.11 -r1.12 --- sendbcast.c 16 Apr 2004 04:03:46 -0000 1.11 +++ sendbcast.c 20 Apr 2004 06:52:54 -0000 1.12 @@ -296,15 +299,9 @@ } =09 for (i =3D 0; i < blinkset->linkcount ; i++) { -// copybuf =3D buf_clone(buf); res =3D blink_send_buf_fast(buf, &(blinkset->blink[i]->link)); =20 - =09 -// if (res<0) -// buf_discard(copybuf);=09 } -// buf_discard(buf); =09 - // res =3D tipc_bsend_buf_fast(buf, &(blinkset->blink[i]->link)); return res; } =20 @@ -529,8 +526,8 @@ if (!in_own_cluster((*crs)->addr))=20 continue; if ( less((*crs) ->acked_bcast,lastseq)){ - count++; retrans_nodes[count] =3D *crs; + count++; }=09 =09 } @@ -555,30 +552,49 @@ struct tipc_msg *msg =3D buf_msg(buf); =20 printk("=3D=3Dretransmit:%d\n", lastseq); - return; firstseq =3D msg_seqno(msg); count =3D retrans_member_count(lastseq); - if (count =3D=3D 0) - return; -=09 - if(count > REPLICA_NODES){ + + if(count > 8){ for(i =3D 0; i < MAX_BEARERS; i++) { if (linkset[i] !=3D NULL) blinksets =3D linkset[i]; - for( j =3D 0;j < blinksets->linkcount;j++) - broadcast_retransmit(&(blinksets->blink[j]->link), buf, lastseq); }=09 + for( j =3D 0;j < blinksets->linkcount;j++) + broadcast_retransmit(&(blinksets->blink[j]->link), buf, lastseq); + =09 } else { for (i=3D0;i<count;i++){ - pnode =3D retrans_nodes[i]; + pnode =3D retrans_nodes[i];=09 + if (!likely(pnode) ) + { + printk("pnode is null\n"); + continue; + } + else + printk("pnode is %d\n",pnode->addr); + =09 + if (!likely(msg)){ + printk("bcast_send_retransmits port=3D%d\n",msg_origport(msg)); + return; + } + else + printk("bcast_send_retransmits port=3D%d\n",msg_origport(msg)); + read_lock_bh(&net_lock); this =3D link_lock_select(pnode->addr, msg_origport(msg)); + if (this) + spin_unlock_bh(&this->owner->lock); =20 + read_unlock_bh(&net_lock); if (unlikely(!this)) + { + printk("no link found\n"); continue; + }=09 bcast_link_retransmit(this, buf, lastseq); =09 } - }=09 + } =09 =09 } =20 @@ -620,6 +636,50 @@ this->retransm_queue_head =3D this->retransm_queue_size =3D 0; } =20 +void +blink_tunnel(struct link *this,=20 + struct tipc_msg *tunnel_hdr,=20 + struct tipc_msg *msg) +{ + struct link *tunnel; + struct sk_buff *buf; + uint length =3D msg_size(msg); + int res =3D 0; +=09 + tunnel =3D this->owner->active_links[msg_link_selector(msg)];=09 + if (!likely(tunnel)){ + printk("the tunnel is null\n"); + return; + }=09 + if (!link_is_up(this)) + return; + msg_set_size(tunnel_hdr, length + INT_H_SIZE); + buf =3D buf_acquire(length + INT_H_SIZE); + msg_set_seqno(tunnel_hdr, mod(tunnel->next_out_no++)); + msg_set_ack(tunnel_hdr, mod(tunnel->next_in_no - 1)); + msg_set_bcast_ack(tunnel_hdr,tunnel->owner->last_in_bcast);=20 + msg_set_user(tunnel_hdr,BCAST_PROTOCOL);=09 + buf_copy_append(buf,0, (unchar *) tunnel_hdr, INT_H_SIZE); + buf_copy_append(buf,INT_H_SIZE,(unchar*)msg, length); + printk("%c->%c:",this->bearer->net_plane,tunnel->bearer->net_plane);=09 + printk("the link name is :%s\n" , tunnel->name); + + printk( "the queue_size is %d\n", tunnel->out_queue_size); + printk("the queue limit is %d\n",tunnel->queue_limit[0]); + //assert(tunnel); + if (tunnel){ + if (bearer_send(tunnel->bearer, buf, &tunnel->media_addr)) { + printk("bearer send\n"); + tunnel->stats.retransmitted++; + } else { + printk("bearer scheule\n"); + bearer_schedule(tunnel->bearer, this); + return; + } + }=09 + printk("link tunnel--tunnel_hdr\n"); +} + /** * Tunel the buffer in the broadcast packet and retransmit it * Input parameters: this: link @@ -633,18 +693,21 @@ =20 int ackseq; struct tipc_msg tunnel_hdr; - + struct tipc_msg *msg =3D buf_msg(buf); +=09 ackseq =3D this->owner->last_in_bcast; =20 msg_init(&tunnel_hdr,BCAST_PROTOCOL, BCAST_MSG, TIPC_OK,INT_H_SIZE, this->addr); =20 + spin_lock_bh(&bcast_outqueue.lock); msg_set_msgcnt(&tunnel_hdr, 1); for (; less_eq(msg_seqno(buf_msg(buf)), lastseq);buf =3D buf->next) { if(less_eq(msg_seqno(buf_msg(buf)), ackseq)) continue; + blink_tunnel(this,&tunnel_hdr,buf_msg(buf)); link_tunnel(this, &tunnel_hdr, buf_msg(buf)); /* The link may reset here, if extreme overload */ if(likely(this->owner)) { |