[Udt-java-commits] SF.net SVN: udt-java:[54] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2011-02-17 21:24:39
|
Revision: 54 http://udt-java.svn.sourceforge.net/udt-java/?rev=54&view=rev Author: bschuller Date: 2011-02-17 21:24:32 +0000 (Thu, 17 Feb 2011) Log Message: ----------- bit of refactoring of sender to avoid memory allocations for data and data packets Modified Paths: -------------- udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/UDTSender.java udt-java/trunk/src/main/java/udt/UDTSession.java udt-java/trunk/src/main/java/udt/UDTSocket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java udt-java/trunk/src/main/java/udt/sender/SenderLossList.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/UDTStatistics.java udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java udt-java/trunk/src/test/java/udt/sender/ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2011-02-17 21:24:32 UTC (rev 54) @@ -168,6 +168,7 @@ } }; Thread t=UDTThreadFactory.get().newThread(receive); + t.setName("UDPEndpoint-"+t.getName()); t.setDaemon(true); t.start(); logger.info("UDTEndpoint started."); Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2011-02-17 21:24:32 UTC (rev 54) @@ -214,6 +214,8 @@ } }; receiverThread=UDTThreadFactory.get().newThread(r); + String s=(session instanceof ServerSession)? "ServerSession": "ClientSession"; + receiverThread.setName("UDTReceiver-"+s+"-"+receiverThread.getName()); receiverThread.start(); } Modified: udt-java/trunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSender.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTSender.java 2011-02-17 21:24:32 UTC (rev 54) @@ -33,9 +33,8 @@ package udt; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -49,6 +48,7 @@ import udt.packets.DataPacket; import udt.packets.KeepAlive; import udt.packets.NegativeAcknowledgement; +import udt.sender.FlowWindow; import udt.sender.SenderLossList; import udt.util.MeanThroughput; import udt.util.MeanValue; @@ -76,13 +76,12 @@ //senderLossList stores the sequence numbers of lost packets //fed back by the receiver through NAK pakets private final SenderLossList senderLossList; - + //sendBuffer stores the sent data packets and their sequence numbers - private final Map<Long,DataPacket>sendBuffer; - - //sendQueue contains the packets to send - private final BlockingQueue<DataPacket>sendQueue; - + private final Map<Long,byte[]>sendBuffer; + + private final FlowWindow flowWindow; + //thread reading packets from send queue and sending them private Thread senderThread; @@ -117,15 +116,18 @@ private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); private final boolean storeStatistics; - + + private final int chunksize; + public UDTSender(UDTSession session,UDPEndPoint endpoint){ if(!session.isReady())throw new IllegalStateException("UDTSession is not ready."); this.endpoint= endpoint; this.session=session; statistics=session.getStatistics(); senderLossList=new SenderLossList(); - sendBuffer=new ConcurrentHashMap<Long, DataPacket>(session.getFlowWindowSize(),0.75f,2); - sendQueue = new ArrayBlockingQueue<DataPacket>(session.getFlowWindowSize(), /*fairness*/ true); + sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2); + chunksize=session.getDatagramSize()-24;//need space for the header; + flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize); lastAckSequenceNumber=session.getInitialSequenceNumber(); currentSequenceNumber=session.getInitialSequenceNumber()-1; waitForAckLatch.set(new CountDownLatch(1)); @@ -179,16 +181,14 @@ } }; senderThread=UDTThreadFactory.get().newThread(r); + String s=(session instanceof ServerSession)? "ServerSession": "ClientSession"; + senderThread.setName("UDTSender-"+s+"-"+senderThread.getName()); senderThread.start(); } /** * sends the given data packet, storing the relevant information - * - * @param data - * @throws IOException - * @throws InterruptedException */ private void send(DataPacket p)throws IOException{ synchronized(sendLock){ @@ -203,28 +203,63 @@ throughput.end(); throughput.begin(); } - sendBuffer.put(p.getPacketSequenceNumber(), p); + sendBuffer.put(p.getPacketSequenceNumber(), p.getData()); unacknowledged.incrementAndGet(); } statistics.incNumberOfSentDataPackets(); } + protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{ + if(!started)start(); + DataPacket packet=null; + do{ + packet=flowWindow.getForProducer(); + if(packet==null){ + Thread.sleep(10); + } + }while(packet==null);//TODO check timeout... + try{ + packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setSession(session); + packet.setDestinationID(session.getDestination().getSocketID()); + int len=Math.min(bb.remaining(),chunksize); + byte[] data=packet.getData(); + bb.get(data,0,len); + packet.setLength(len); + }finally{ + flowWindow.produce(); + } + + } + /** - * writes a data packet into the sendQueue, waiting at most for the specified time + * writes a data packet, waiting at most for the specified time * if this is not possible due to a full send queue * - * @return <code>true</code>if the packet was added, <code>false</code> if the - * packet could not be added because the queue was full - * @param p * @param timeout * @param units * @return * @throws IOException * @throws InterruptedException */ - protected boolean sendUdtPacket(DataPacket p, int timeout, TimeUnit units)throws IOException,InterruptedException{ + protected void sendUdtPacket(byte[]data, int timeout, TimeUnit units)throws IOException, InterruptedException{ if(!started)start(); - return sendQueue.offer(p,timeout,units); + DataPacket packet=null; + do{ + packet=flowWindow.getForProducer(); + if(packet==null){ + Thread.sleep(10); + // System.out.println("queue full: "+flowWindow); + } + }while(packet==null); + try{ + packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setSession(session); + packet.setDestinationID(session.getDestination().getSocketID()); + packet.setData(data); + }finally{ + flowWindow.produce(); + } } //receive a packet from server from the peer @@ -268,6 +303,7 @@ for(long s=lastAckSequenceNumber;s<ackNumber;s++){ synchronized (sendLock) { removed=sendBuffer.remove(s)!=null; + senderLossList.remove(s); } if(removed){ unacknowledged.decrementAndGet(); @@ -291,7 +327,7 @@ session.getCongestionControl().onLoss(nak.getDecodedLossInfo()); session.getSocket().getReceiver().resetEXPTimer(); statistics.incNumberOfNAKReceived(); - + if(logger.isLoggable(Level.FINER)){ logger.finer("NAK for "+nak.getDecodedLossInfo().size()+" packets lost, " +"set send period to "+session.getCongestionControl().getSendInterval()); @@ -322,13 +358,11 @@ public void senderAlgorithm()throws InterruptedException, IOException{ while(!paused){ iterationStart=Util.getCurrentTime(); - //if the sender's loss list is not empty - if (!senderLossList.isEmpty()) { - Long entry=senderLossList.getFirstEntry(); - handleResubmit(entry); + Long entry=senderLossList.getFirstEntry(); + if(entry!=null){ + handleRetransmit(entry); } - else { //if the number of unacknowledged data packets does not exceed the congestion @@ -336,9 +370,9 @@ int unAcknowledged=unacknowledged.get(); if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ + && unAcknowledged<session.getFlowWindowSize()){ //check for application data - DataPacket dp=sendQueue.poll(); + DataPacket dp=flowWindow.consumeData(); if(dp!=null){ send(dp); largestSentSequenceNumber=dp.getPacketSequenceNumber(); @@ -374,15 +408,21 @@ } /** - * re-submits an entry from the sender loss list + * re-transmit an entry from the sender loss list * @param entry */ - protected void handleResubmit(Long seqNumber){ + protected void handleRetransmit(Long seqNumber){ try { //retransmit the packet and remove it from the list - DataPacket pktToRetransmit = sendBuffer.get(seqNumber); - if(pktToRetransmit!=null){ - endpoint.doSend(pktToRetransmit); + byte[]data=sendBuffer.get(seqNumber); + if(data!=null){ + //System.out.println("re-transmit "+data); + DataPacket packet=new DataPacket(); + packet.setPacketSequenceNumber(seqNumber); + packet.setSession(session); + packet.setDestinationID(session.getDestination().getSocketID()); + packet.setData(data); + endpoint.doSend(packet); statistics.incNumberOfRetransmittedDataPackets(); } }catch (Exception e) { @@ -457,14 +497,14 @@ */ public void waitForAck()throws InterruptedException{ waitForAckLatch.set(new CountDownLatch(1)); - waitForAckLatch.get().await(2, TimeUnit.MILLISECONDS); + waitForAckLatch.get().await(200, TimeUnit.MICROSECONDS); } public void stop(){ stopped=true; } - + public void pause(){ startLatch=new CountDownLatch(1); paused=true; Modified: udt-java/trunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSession.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTSession.java 2011-02-17 21:24:32 UTC (rev 54) @@ -74,7 +74,7 @@ * flow window size, i.e. how many data packets are * in-flight at a single time */ - protected int flowWindowSize=1024; + protected int flowWindowSize=1024*10; /** * remote UDT entity (address and socket ID) Modified: udt-java/trunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/UDTSocket.java 2011-02-17 21:24:32 UTC (rev 54) @@ -36,46 +36,41 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; - -import udt.packets.DataPacket; - /** * UDTSocket is analogous to a normal java.net.Socket, it provides input and * output streams for the application * * TODO is it possible to actually extend java.net.Socket ? * - * */ public class UDTSocket { - + //endpoint private final UDPEndPoint endpoint; - + private volatile boolean active; - - //processing received data + + //processing received data private UDTReceiver receiver; private UDTSender sender; - + private final UDTSession session; private UDTInputStream inputStream; private UDTOutputStream outputStream; - /** - * @param host - * @param port - * @param endpoint - * @throws SocketException,UnknownHostException - */ + * @param host + * @param port + * @param endpoint + * @throws SocketException,UnknownHostException + */ public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ this.endpoint=endpoint; this.session=session; this.receiver=new UDTReceiver(session,endpoint); this.sender=new UDTSender(session,endpoint); } - + public UDTReceiver getReceiver() { return receiver; } @@ -114,7 +109,7 @@ } return inputStream; } - + /** * get the output stream for writing to this socket * @return @@ -125,20 +120,20 @@ } return outputStream; } - + public final UDTSession getSession(){ return session; } - + /** * write single block of data without waiting for any acknowledgement * @param data */ protected void doWrite(byte[]data)throws IOException{ doWrite(data, 0, data.length); - + } - + /** * write the given data * @param data - the data array @@ -148,14 +143,14 @@ */ protected void doWrite(byte[]data, int offset, int length)throws IOException{ try{ - doWrite(data, offset, length, Integer.MAX_VALUE, TimeUnit.MILLISECONDS); + doWrite(data, offset, length, 10, TimeUnit.MILLISECONDS); }catch(InterruptedException ie){ IOException io=new IOException(); io.initCause(ie); throw io; } } - + /** * write the given data, waiting at most for the specified time if the queue is full * @param data @@ -167,26 +162,17 @@ * @throws InterruptedException */ protected void doWrite(byte[]data, int offset, int length, int timeout, TimeUnit units)throws IOException,InterruptedException{ - int chunksize=session.getDatagramSize()-24;//need some bytes for the header ByteBuffer bb=ByteBuffer.wrap(data,offset,length); - long seqNo=0; while(bb.remaining()>0){ - int len=Math.min(bb.remaining(),chunksize); - byte[]chunk=new byte[len]; - bb.get(chunk); - DataPacket packet=new DataPacket(); - seqNo=sender.getNextSequenceNumber(); - packet.setPacketSequenceNumber(seqNo); - packet.setSession(session); - packet.setDestinationID(session.getDestination().getSocketID()); - packet.setData(chunk); - //put the packet into the send queue - if(!sender.sendUdtPacket(packet, timeout, units)){ - throw new IOException("Queue full"); + try{ + sender.sendUdtPacket(bb, timeout, units); + }catch(Exception ex){ + ex.printStackTrace(); } } if(length>0)active=true; } + /** * will block until the outstanding packets have really been sent out * and acknowledged @@ -207,13 +193,13 @@ //TODO need to check if we can pause the sender... //sender.pause(); } - + //writes and wait for ack protected void doWriteBlocking(byte[]data)throws IOException, InterruptedException{ doWrite(data); flush(); } - + /** * close the connection * @throws IOException Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2011-02-17 21:24:32 UTC (rev 54) @@ -44,12 +44,15 @@ private long destinationID; private UDTSession session; + + private int dataLength; public DataPacket(){ } /** - * create a DataPacket + * create a DataPacket from the given raw data + * * @param encodedData - network data */ public DataPacket(byte[] encodedData){ @@ -58,6 +61,7 @@ public DataPacket(byte[] encodedData, int length){ decode(encodedData,length); + dataLength=length; } void decode(byte[]encodedData,int length){ @@ -75,16 +79,16 @@ } public double getLength(){ - return data.length; + return dataLength; } - /* - * aplivation data - * @param - */ - + public void setLength(int length){ + dataLength=length; + } + public void setData(byte[] data) { this.data = data; + dataLength=data.length; } public long getPacketSequenceNumber() { @@ -125,12 +129,12 @@ */ public byte[] getEncoded(){ //header.length is 16 - byte[] result=new byte[16+data.length]; + byte[] result=new byte[16+dataLength]; System.arraycopy(PacketUtil.encode(packetSequenceNumber), 0, result, 0, 4); System.arraycopy(PacketUtil.encode(messageNumber), 0, result, 4, 4); System.arraycopy(PacketUtil.encode(timeStamp), 0, result, 8, 4); System.arraycopy(PacketUtil.encode(destinationID), 0, result, 12, 4); - System.arraycopy(data, 0, result, 16, data.length); + System.arraycopy(data, 0, result, 16, dataLength); return result; } Modified: udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/receiver/PacketHistoryWindow.java 2011-02-17 21:24:32 UTC (rev 54) @@ -39,12 +39,17 @@ */ public class PacketHistoryWindow extends CircularArray<Long>{ + private final long[]intervals; + private final int num; + /** * create a new PacketHistoryWindow of the given size * @param size */ public PacketHistoryWindow(int size){ super(size); + num=max-1; + intervals=new long[num]; } /** @@ -54,12 +59,11 @@ */ public long getPacketArrivalSpeed(){ if(!haveOverflow)return 0; - int num=max-1; + double AI; double medianPacketArrivalSpeed; double total=0; int count=0; - long[]intervals=new long[num]; int pos=position-1; if(pos<0)pos=num; do{ Added: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/FlowWindow.java (rev 0) +++ udt-java/trunk/src/main/java/udt/sender/FlowWindow.java 2011-02-17 21:24:32 UTC (rev 54) @@ -0,0 +1,139 @@ +package udt.sender; + +import java.util.concurrent.locks.ReentrantLock; + +import udt.packets.DataPacket; + +/** + * + * holds a fixed number of {@link DataPacket} instances which are sent out. + * + * it is assumed that a single thread stores new data, and another single thread + * reads/removes data + * + * @author schuller + */ +public class FlowWindow { + + private final DataPacket[]packets; + + private final int length; + + private volatile boolean isEmpty=true; + + private volatile boolean isFull=false; + + private volatile int validEntries=0; + + private volatile boolean isCheckout=false; + + private volatile int writePos=0; + + private volatile int readPos=-1; + + private volatile int consumed=0; + + private volatile int produced=0; + + private final ReentrantLock lock; + + /** + * @param size - flow window size + * @param chunksize - data chunk size + */ + public FlowWindow(int size, int chunksize){ + this.length=size; + packets=new DataPacket[length]; + for(int i=0;i<packets.length;i++){ + packets[i]=new DataPacket(); + packets[i].setData(new byte[chunksize]); + } + lock=new ReentrantLock(true); + } + + /** + * get a data packet for updating with new data + * + * @return <code>null</code> if flow window is full + */ + public DataPacket getForProducer(){ + lock.lock(); + try{ + if(isFull){ + return null; + } + if(isCheckout)throw new IllegalStateException(); + isCheckout=true; + DataPacket p=packets[writePos]; + return p; + }finally{ + lock.unlock(); + } + } + + public void produce(){ + lock.lock(); + try{ + isCheckout=false; + writePos++; + if(writePos==length)writePos=0; + validEntries++; + isFull=validEntries==length-1; + isEmpty=false; + produced++; + }finally{ + lock.unlock(); + } + } + + + public DataPacket consumeData(){ + if(isEmpty){ + return null; + } + lock.lock(); + try{ + readPos++; + DataPacket p=packets[readPos]; + if(readPos==length-1)readPos=-1; + validEntries--; + isEmpty=validEntries==0; + isFull=false; + consumed++; + return p; + }finally{ + lock.unlock(); + } + } + + boolean isEmpty(){ + return isEmpty; + } + + /** + * check if another entry can be added + * @return + */ + public boolean isFull(){ + return isFull; + } + + int readPos(){ + return readPos; + } + + int writePos(){ + return writePos; + } + + int consumed(){ + return consumed; + } + public String toString(){ + StringBuilder sb=new StringBuilder(); + sb.append("FlowWindow size=").append(length); + sb.append(" full=").append(isFull).append(" empty=").append(isEmpty); + sb.append(" consumed=").append(consumed).append(" produced=").append(produced); + return sb.toString(); + } +} Property changes on: udt-java/trunk/src/main/java/udt/sender/FlowWindow.java ___________________________________________________________________ Added: svn:mime-type + text/plain Modified: udt-java/trunk/src/main/java/udt/sender/SenderLossList.java =================================================================== --- udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/sender/SenderLossList.java 2011-02-17 21:24:32 UTC (rev 54) @@ -33,8 +33,6 @@ package udt.sender; import java.util.LinkedList; -import udt.util.MeanValue; - /** * stores the sequence number of the lost packets in increasing order */ @@ -57,14 +55,19 @@ backingList.add(i,obj); return; } - else if(obj==entry)return; + else if(obj.equals(entry))return; } backingList.add(obj); } } + public void remove(Long obj){ + synchronized (backingList) { + backingList.remove(obj); + } + } /** - * retrieves the loss list entry with the lowest sequence number + * retrieves the loss list entry with the lowest sequence number, or <code>null</code> if loss list is empty */ public Long getFirstEntry(){ synchronized(backingList){ Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2011-02-17 21:24:32 UTC (rev 54) @@ -109,7 +109,6 @@ while(true)Thread.sleep(10000); } - File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); Modified: udt-java/trunk/src/main/java/udt/util/UDTStatistics.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/main/java/udt/util/UDTStatistics.java 2011-02-17 21:24:32 UTC (rev 54) @@ -188,7 +188,7 @@ sb.append("Duplicate data packets: ").append(getNumberOfDuplicateDataPackets()).append("\n"); sb.append("ACK received: ").append(getNumberOfACKReceived()).append("\n"); sb.append("NAK received: ").append(getNumberOfNAKReceived()).append("\n"); - sb.append("Retransmitted data: ").append(getNumberOfNAKReceived()).append("\n"); + sb.append("Retransmitted data: ").append(getNumberOfRetransmittedDataPackets()).append("\n"); sb.append("NAK sent: ").append(getNumberOfNAKSent()).append("\n"); sb.append("ACK sent: ").append(getNumberOfACKSent()).append("\n"); if(roundTripTime>0){ Modified: udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java =================================================================== --- udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-01-06 16:13:32 UTC (rev 53) +++ udt-java/trunk/src/test/java/udt/performance/TestUDTLargeData.java 2011-02-17 21:24:32 UTC (rev 54) @@ -21,7 +21,7 @@ boolean running=false; //how many - int num_packets=300; + int num_packets=100; //how large is a single packet int size=1*1024*1024; @@ -55,7 +55,7 @@ new Random().nextBytes(data); MessageDigest digest=MessageDigest.getInstance("MD5"); - while(!serverRunning)Thread.sleep(100); + while(!serverStarted)Thread.sleep(100); long start=System.currentTimeMillis(); System.out.println("Sending <"+num_packets+"> packets of <"+format.format(size/1024.0/1024.0)+"> Mbytes each"); long end=0; @@ -101,6 +101,7 @@ long total=0; volatile boolean serverRunning=true; + volatile boolean serverStarted=false; volatile String md5_received=null; @@ -110,6 +111,7 @@ Runnable serverProcess=new Runnable(){ public void run(){ + try{ Boolean devNull=Boolean.getBoolean("udt.dev.null"); if(devNull){ @@ -118,6 +120,7 @@ MessageDigest md5=MessageDigest.getInstance("MD5"); long start=System.currentTimeMillis(); UDTSocket s=serverSocket.accept(); + serverStarted=true; assertNotNull(s); UDTInputStream is=s.getInputStream(); byte[]buf=new byte[READ_BUFFERSIZE]; Added: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java =================================================================== --- udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java (rev 0) +++ udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java 2011-02-17 21:24:32 UTC (rev 54) @@ -0,0 +1,151 @@ +package udt.sender; + +import java.util.concurrent.TimeoutException; + +import junit.framework.TestCase; +import udt.packets.DataPacket; + +public class TestFlowWindow extends TestCase { + + public void testFillWindow()throws InterruptedException, TimeoutException{ + FlowWindow fw=new FlowWindow(3, 128); + DataPacket p1=fw.getForProducer(); + assertNotNull(p1); + fw.produce(); + DataPacket p2=fw.getForProducer(); + assertNotNull(p2); + fw.produce(); + assertFalse(p1==p2); + DataPacket p3=fw.getForProducer(); + assertNotNull(p3); + assertFalse(p1==p3); + assertFalse(p2==p3); + fw.produce(); + assertTrue(fw.isFull()); + + DataPacket no=fw.getForProducer(); + assertNull("Window should be full",no); + + DataPacket c1=fw.consumeData(); + //must be p1 + assertTrue(c1==p1); + DataPacket c2=fw.consumeData(); + //must be p2 + assertTrue(c2==p2); + DataPacket c3=fw.consumeData(); + //must be p3 + assertTrue(c3==p3); + assertTrue(fw.isEmpty()); + } + + public void testOverflow()throws InterruptedException, TimeoutException{ + FlowWindow fw=new FlowWindow(3, 64); + DataPacket p1=fw.getForProducer(); + assertNotNull(p1); + fw.produce(); + DataPacket p2=fw.getForProducer(); + assertNotNull(p2); + fw.produce(); + assertFalse(p1==p2); + DataPacket p3=fw.getForProducer(); + assertNotNull(p3); + assertFalse(p1==p3); + assertFalse(p2==p3); + fw.produce(); + assertTrue(fw.isFull()); + + //read one + DataPacket c1=fw.consumeData(); + //must be p1 + assertTrue(c1==p1); + assertFalse(fw.isFull()); + + //now a slot for writing should be free again + DataPacket p4=fw.getForProducer(); + assertNotNull(p4); + fw.produce(); + //which is again p1 + assertTrue(p4==p1); + + } + + private volatile boolean fail=false; + + public void testConcurrentReadWrite()throws InterruptedException{ + final FlowWindow fw=new FlowWindow(20, 64); + Thread reader=new Thread(new Runnable(){ + public void run(){ + doRead(fw); + } + }); + reader.setName("reader"); + Thread writer=new Thread(new Runnable(){ + public void run(){ + doWrite(fw); + } + }); + writer.setName("writer"); + + writer.start(); + reader.start(); + + int c=0; + while(read && write && c<10){ + Thread.sleep(1000); + c++; + } + assertFalse("An error occured in reader or writer",fail); + + } + + volatile boolean read=true; + volatile boolean write=true; + int N=100000; + + private void doRead(final FlowWindow fw){ + System.out.println("Starting reader..."); + try{ + for(int i=0;i<N;i++){ + DataPacket p=null; + while( (p=fw.consumeData())==null){ + Thread.sleep(1); + } + synchronized (p) { + assertEquals(i,p.getMessageNumber()); + } + } + }catch(Throwable ex){ + ex.printStackTrace(); + fail=true; + } + System.out.println("Exiting reader..."); + read=false; + } + + private void doWrite(final FlowWindow fw){ + System.out.println("Starting writer..."); + DataPacket p=null; + try{ + for(int i=0;i<N;i++){ + p=null; + do{ + p=fw.getForProducer(); + if(p!=null){ + synchronized(p){ + p.setData(("test"+i).getBytes()); + p.setMessageNumber(i); + fw.produce(); + } + } + }while(p==null); + } + }catch(Exception ex){ + ex.printStackTrace(); + System.out.println("ERROR****"); + fail=true; + } + System.out.println("Exiting writer..."); + write=false; + } + +} Property changes on: udt-java/trunk/src/test/java/udt/sender/TestFlowWindow.java ___________________________________________________________________ Added: svn:mime-type + text/plain This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |