[Slimp3-checkins] CVS: slimp3/server/lib/SliMP3 Stream.pm,1.12,1.13
Brought to you by:
blackketter,
slimdevices
From: Sean A. <sli...@us...> - 2002-03-31 10:50:11
|
Update of /cvsroot/slimp3/slimp3/server/lib/SliMP3 In directory usw-pr-cvs1:/tmp/cvs-serv5537/lib/SliMP3 Modified Files: Stream.pm Log Message: retransmit logic fixed - there should only be one timeout for the stream misc optimizations tested okay with 2% packet loss Index: Stream.pm =================================================================== RCS file: /cvsroot/slimp3/slimp3/server/lib/SliMP3/Stream.pm,v retrieving revision 1.12 retrieving revision 1.13 diff -C2 -d -r1.12 -r1.13 *** Stream.pm 30 Mar 2002 08:14:20 -0000 1.12 --- Stream.pm 31 Mar 2002 10:50:07 -0000 1.13 *************** *** 34,44 **** my %bytesSent; # - bytes sent in this stream my %bytesDecoded; - my %expectedAck; my %usage; # buffer usage my %seq; # the next sequence number to send ! my %packetsInFlight; # reference to hash of pktInfoStructs for each packet in flight to this client, by seq my %numInFlight; # number of packets in flight my %bytesInFlight; # total bytes in flight my %cwnd; # current cwnd my %prevRptr; # the last rptr we got from the client --- 34,44 ---- my %bytesSent; # - bytes sent in this stream my %bytesDecoded; my %usage; # buffer usage my %seq; # the next sequence number to send ! my %packetsInFlight; # reference to array of pktInfoStructs for each packet in flight to this client, by seq my %numInFlight; # number of packets in flight my %bytesInFlight; # total bytes in flight my %cwnd; # current cwnd + my %retransmitQueue; # ref to queue of packets needing to be retransmitted my %prevRptr; # the last rptr we got from the client *************** *** 50,54 **** my ($client) = @_; ! printf("rtt %5.1f | wptr %.5d | rptr %.5d | rate %6.2f | avgrate %6.2f | usage %.2d | cwnd %.2d\n", $client->RTT * 1000, $curWptr{$client}, --- 50,55 ---- my ($client) = @_; ! printf("rtt %5.1f | wptr %.5d | rptr %.5d | rate %6.2f | avgrate %6.2f | usage %.2d | cwnd %.2d". ! " | inFlight %.2d | \n", $client->RTT * 1000, $curWptr{$client}, *************** *** 57,61 **** $bytesDecoded{$client} * 8 / 1000 / (time() - $startTime{$client}), $usage{$client}, ! $cwnd{$client} ); } --- 58,63 ---- $bytesDecoded{$client} * 8 / 1000 / (time() - $startTime{$client}), $usage{$client}, ! $cwnd{$client}, ! $numInFlight{$client}, ); } *************** *** 70,74 **** len => '$', txtime => '$', - retx_timer => '$', # reference to the retransmit timer ]); --- 72,75 ---- *************** *** 134,138 **** } $usage{$client}=0; ! delete($packetsInFlight{$client}); $numInFlight{$client}=0; $bytesInFlight{$client}=0; --- 135,139 ---- } $usage{$client}=0; ! $packetsInFlight{$client} = []; $numInFlight{$client}=0; $bytesInFlight{$client}=0; *************** *** 232,245 **** # ! # Retransmit a timed out packet ! sub retransmit { ! my ($client, $pkt) = @_; ! print "retransmit\n"; ! my $chunkref=$pkt->chunkref; # FIXME always pass chunk by reference ! &sendStreamPkt($client, $$chunkref, $pkt->wptr); } --- 233,253 ---- # ! # Retransmit timed out packets ! sub timeout { ! my ($client) = @_; ! print "timeout with these packets in flight:\n"; ! my $pkt; ! foreach $pkt (@{$packetsInFlight{$client}}) { ! print " seq: ".$pkt->seq.", wptr: $pkt->wptr\n"; ! } ! push @{$retransmitQueue{$client}}, @{$packetsInFlight{$client}}; ! $packetsInFlight{$client}=[]; ! $numInFlight{$client}=0; ! $bytesInFlight{$client}=0; # FIXME always pass chunk by reference ! &sendNextChunk($client); } *************** *** 250,263 **** my ( $client, # the client ! $chunk, # the chunk of data. can be '', must be even length ! $wptr, # write offset ) = @_; ! $::d_pv && print "sending stream packet, wptr = $wptr, state = $streamState{$client}\n"; ! ! if (length($chunk)%2 != 0) { ! warn "odd chunk length"; ! $chunk.=' '; ! } my $control; --- 258,265 ---- my ( $client, # the client ! $pkt, ) = @_; ! $::d_pv && print "sending stream packet, wptr = ".$pkt->wptr.", state = $streamState{$client}\n"; my $control; *************** *** 279,314 **** $control. # control code ' '. # n/a: client's time stamp goes here, in the response ! pack('C', int($wptr/256)). # wptr high ! pack('C', $wptr%256). # wptr low ' '. # n/a: rptr response ! pack('C', int($seq{$client}/256)). # sequence number ! pack('C', $seq{$client}%256). ' '. # reserved ! $chunk; # stream data $client->udpsock->send($udpdata, 0, $client->paddr); - - my $len=length($chunk); - - my $pkt = pktInfoStruct->new(); - $pkt->seq($seq{$client}); - $pkt->txtime(time()); - $pkt->wptr($wptr); - $pkt->len($len); - $pkt->chunkref(\$chunk); - - # - # set the retransmit timer for this pkt - $pkt->retx_timer(SliMP3::Timers::setTimer($client, time+$TIMEOUT, \&retransmit, $pkt)); - - $packetsInFlight{$client}{$seq{$client}} = $pkt; - $bytesInFlight{$client}+=$len; - $numInFlight{$client}++; - - $bytesSent{$client}+=$len; - $expectedAck{$client} = $wptr; - $seq{$client} = plus ($seq{$client}, 1); } --- 281,295 ---- $control. # control code ' '. # n/a: client's time stamp goes here, in the response ! pack('C', int($pkt->wptr/256)). # wptr high ! pack('C', $pkt->wptr%256). # wptr low ' '. # n/a: rptr response ! pack('C', int($pkt->seq/256)). # sequence number ! pack('C', $pkt->seq%256). ' '. # reserved ! ${$pkt->chunkref}; # stream data $client->udpsock->send($udpdata, 0, $client->paddr); } *************** *** 321,346 **** $::d_pv && print "sendNextChunk\n"; ! # while ($numInFlight{$client} < $cwnd{$client}) { ! ! if ($usage{$client} > 90) { ! # if client's buffer is full, poll it every 100ms until there's room ! if ($streamState{$client} eq 'play') { ! SliMP3::Timers::setTimer($client, time + .1, \&sendEmptyChunk); ! } ! return; } my $chunk; $client->mp3filehandle()->read($chunk, $MAXCHUNK); $::d_pv && print "read ".length($chunk)." bytes from file handle\n"; - if (($usage{$client} > 25) && ($streamState{$client} eq 'buffering')) { ! $streamState{$client}='play'; $startTime{$client}=time(); } ! &sendStreamPkt($client, $chunk, $curWptr{$client}); ! $curWptr{$client}= plus($curWptr{$client}, (length($chunk)/2) % 2**16); ! # } } --- 302,353 ---- $::d_pv && print "sendNextChunk\n"; ! if ($usage{$client} > 90) { ! # if client's buffer is full, poll it every 100ms until there's room ! if ($streamState{$client} eq 'play') { ! SliMP3::Timers::setTimer($client, time + .1, \&sendEmptyChunk); } + return; + } + my $pkt; + if (!($pkt = shift(@{$retransmitQueue{$client}}))) { my $chunk; $client->mp3filehandle()->read($chunk, $MAXCHUNK); + if (length($chunk)%2 != 0) { + warn "odd chunk length"; + $chunk.=' '; + } $::d_pv && print "read ".length($chunk)." bytes from file handle\n"; if (($usage{$client} > 25) && ($streamState{$client} eq 'buffering')) { ! $streamState{$client}='play'; $startTime{$client}=time(); } ! ! my $len=length($chunk); ! ! $pkt = pktInfoStruct->new(); ! $pkt->seq($seq{$client}); ! $pkt->txtime(time()); ! $pkt->wptr($curWptr{$client}); ! $pkt->len($len); ! $pkt->chunkref(\$chunk); ! ! $curWptr{$client}= plus($curWptr{$client}, ($len/2) % 2**16); ! ! $seq{$client} = plus ($seq{$client}, 1); ! } ! ! ! &sendStreamPkt($client, $pkt); ! ! push @{$packetsInFlight{$client}}, $pkt; ! $bytesInFlight{$client}+=$pkt->len; ! $numInFlight{$client}++; ! $bytesSent{$client}+=$pkt->len; ! ! # ! # restart the timeout ! SliMP3::Timers::setTimer($client, time+$TIMEOUT, \&timeout); } *************** *** 354,358 **** $::d_pv && print "sendEmptyChunk\n"; ! &sendStreamPkt($client, '', $curWptr{$client}); } --- 361,379 ---- $::d_pv && print "sendEmptyChunk\n"; ! my $pkt = pktInfoStruct->new(); ! ! $pkt->seq($seq{$client}); ! $pkt->txtime(time()); ! $pkt->wptr($curWptr{$client}); ! $pkt->len(0); ! $pkt->chunkref(\''); ! ! &sendStreamPkt($client, $pkt); ! ! push @{$packetsInFlight{$client}}, $pkt; ! $numInFlight{$client}++; ! $seq{$client} = plus ($seq{$client}, 1); ! ! SliMP3::Timers::setTimer($client, time+$TIMEOUT, \&timeout); } *************** *** 362,378 **** sub gotAck { my ($client, $wptr, $rptr, $seq) = @_; my $now = time(); ! if (!defined($packetsInFlight{$client}{$seq})) { $::d_pv && print 'ignoring ack from previous stream: '; return; ! } $::d_pv && print "ack: wptr = $wptr, rptr = $rptr, seq = $seq\n"; - # fetch the info about this packet - my $pkt = $packetsInFlight{$client}{$seq}; - # calculate RTT my $rtt = $now - $pkt->txtime; --- 383,409 ---- sub gotAck { my ($client, $wptr, $rptr, $seq) = @_; + my $pkt; + my $eachpkt; my $now = time(); ! my $pifIndex = 0; ! foreach $eachpkt (@{$packetsInFlight{$client}}) { ! printf "inflight: ". $eachpkt->seq."\n"; ! if ($seq == $eachpkt->seq) { ! $pkt=$eachpkt; ! last; ! } ! $pifIndex++; ! } ! ! if (!defined($pkt)) { $::d_pv && print 'ignoring ack from previous stream: '; + $::d_pv && print "ack: wptr = $wptr, rptr = $rptr, seq = $seq\n"; return; ! } $::d_pv && print "ack: wptr = $wptr, rptr = $rptr, seq = $seq\n"; # calculate RTT my $rtt = $now - $pkt->txtime; *************** *** 408,415 **** # kill the retransmit timer for this pkt ! SliMP3::Timers::killSpecific($pkt->retx_timer); ! # we're done with this packet now ! delete($packetsInFlight{$client}{$seq}); # print stats --- 439,446 ---- # kill the retransmit timer for this pkt ! SliMP3::Timers::killTimers($client, \&timeout); ! # we're done with this packet now - remove it from the queue ! splice(@{$packetsInFlight{$client}}, $pifIndex, 1); # print stats |