[Udt-java-commits] SF.net SVN: udt-java:[43] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
From: <bsc...@us...> - 2010-08-30 21:17:22
|
Revision: 43 http://udt-java.svn.sourceforge.net/udt-java/?rev=43&view=rev Author: bschuller Date: 2010-08-30 21:17:15 +0000 (Mon, 30 Aug 2010) Log Message: ----------- change packet encoding to be compatible with UDT v4 (C++ version); change send/recv file a bit. Does not quite work, but much better :) Modified Paths: -------------- udt-java/trunk/src/main/java/udt/ClientSession.java udt-java/trunk/src/main/java/udt/ServerSession.java udt-java/trunk/src/main/java/udt/UDPEndPoint.java udt-java/trunk/src/main/java/udt/UDTReceiver.java udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/trunk/src/main/java/udt/packets/ControlPacket.java udt-java/trunk/src/main/java/udt/packets/DataPacket.java udt-java/trunk/src/main/java/udt/packets/PacketFactory.java udt-java/trunk/src/main/java/udt/packets/PacketUtil.java udt-java/trunk/src/main/java/udt/util/Application.java udt-java/trunk/src/main/java/udt/util/ReceiveFile.java udt-java/trunk/src/main/java/udt/util/SendFile.java udt-java/trunk/src/test/java/udt/TestControlPacket.java udt-java/trunk/src/test/java/udt/TestControlPacketType.java udt-java/trunk/src/test/java/udt/TestPacketFactory.java udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java Modified: udt-java/trunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-08-30 21:17:15 UTC (rev 43) @@ -135,7 +135,7 @@ handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); handshake.setSession(this); - logger.info("Handshake to "+this.getDestination()); + logger.info("Sending "+handshake); endPoint.doSend(handshake); } Modified: udt-java/trunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/trunk/src/main/java/udt/ServerSession.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/ServerSession.java 2010-08-30 21:17:15 UTC (rev 43) @@ -56,10 +56,10 @@ //last received packet (for testing purposes) private UDTPacket lastPacket; - public ServerSession(DatagramPacket dp,UDPEndPoint endPoint)throws SocketException,UnknownHostException{ + public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort())); this.endPoint=endPoint; - logger.info("Created "+toString()+" talking to "+getDestination()); + logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); } int n_handshake=0; @@ -68,14 +68,16 @@ public void received(UDTPacket packet, Destination peer){ lastPacket=packet; if (getState()<=ready && packet instanceof ConnectionHandshake) { - logger.info("Received ConnectionHandshake from "+peer); ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet; destination.setSocketID(connectionHandshake.getSocketID()); + + logger.info("Received "+connectionHandshake); + if(getState()<=handshaking){ setState(handshaking); } try{ - handleHandShake(connectionHandshake,peer); + handleHandShake(connectionHandshake); n_handshake++; try{ setState(ready); @@ -154,7 +156,7 @@ * @param peer * @throws IOException */ - protected void handleHandShake(ConnectionHandshake handshake,Destination peer)throws IOException{ + protected void handleHandShake(ConnectionHandshake handshake)throws IOException{ ConnectionHandshake responseHandshake = new ConnectionHandshake(); //compare the packet size and choose minimun long clientBufferSize=handshake.getPacketSize(); @@ -166,11 +168,13 @@ responseHandshake.setPacketSize(bufferSize); responseHandshake.setUdtVersion(4); responseHandshake.setInitialSeqNo(initialSequenceNumber); - responseHandshake.setConnectionType(1); + responseHandshake.setConnectionType(-1); + responseHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize()); //tell peer what the socket ID on this side is responseHandshake.setSocketID(mySocketID); responseHandshake.setDestinationID(this.getDestination().getSocketID()); responseHandshake.setSession(this); + logger.info("Sending reply "+responseHandshake); endPoint.doSend(responseHandshake); } Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-08-30 21:17:15 UTC (rev 43) @@ -50,7 +50,6 @@ import udt.packets.ConnectionHandshake; import udt.packets.Destination; import udt.packets.PacketFactory; -import udt.util.MeanValue; import udt.util.UDTThreadFactory; /** @@ -203,6 +202,7 @@ } public void addSession(Long destinationID,UDTSession session){ + logger.info("Storing session <"+destinationID+">"); sessions.put(destinationID, session); } @@ -248,17 +248,21 @@ private long lastDestID=-1; private UDTSession lastSession; - MeanValue v=new MeanValue("receiver processing ",true, 256); + //MeanValue v=new MeanValue("receiver processing ",true, 256); + private final Object lock=new Object(); + protected void doReceive()throws IOException{ while(!stopped){ try{ try{ - v.end(); + //v.end(); + //will block until a packet is received or timeout has expired dgSocket.receive(dp); - v.begin(); + //v.begin(); + Destination peer=new Destination(dp.getAddress(), dp.getPort()); int l=dp.getLength(); UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); @@ -266,19 +270,21 @@ //handle connection handshake if(packet.isConnectionHandshake()){ - UDTSession session=clientSessions.get(peer); - if(session==null){ - session=new ServerSession(dp,this); - addSession(session.getSocketID(),session); - //TODO need to check peer to avoid duplicate server session - if(serverSocketMode){ - logger.fine("Pooling new request."); - sessionHandoff.put(session); - logger.fine("Request taken for processing."); + synchronized(lock){ + UDTSession session=clientSessions.get(peer); + if(session==null){ + session=new ServerSession(dp,this); + addSession(session.getSocketID(),session); + //TODO need to check peer to avoid duplicate server session + if(serverSocketMode){ + logger.fine("Pooling new request."); + sessionHandoff.put(session); + logger.fine("Request taken for processing."); + } } + peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); + session.received(packet,peer); } - peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); - session.received(packet,peer); } else{ //dispatch to existing session Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-08-30 21:17:15 UTC (rev 43) @@ -500,7 +500,7 @@ private Acknowledgement buildLightAcknowledgement(long ackNumber){ Acknowledgement acknowledgmentPkt = new Acknowledgement(); //the packet sequence number to which all the packets have been received - acknowledgmentPkt.setNexttoPrevPktSeqNO(ackNumber); + acknowledgmentPkt.setAckNumber(ackNumber); //assign this ack a unique increasing ACK sequence number acknowledgmentPkt.setAckSequenceNumber(++ackSequenceNumber); acknowledgmentPkt.setRoundTripTime(roundTripTime); Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-08-30 21:17:15 UTC (rev 43) @@ -41,10 +41,13 @@ * receipt of packets */ public class Acknowledgement extends ControlPacket { - + + //the ack sequence number + private long ackSequenceNumber ; + //the packet sequence number to which all the previous packets have been received (excluding) private long ackNumber ; - + //round-trip time in microseconds(RTT) private long roundTripTime; // RTT variance @@ -55,13 +58,14 @@ private long pktArrivalSpeed; //estimated link capacity in number of packets per second private long estimatedLinkCapacity; - + public Acknowledgement(){ this.controlPacketType=ControlPacketType.ACK.ordinal(); } - - public Acknowledgement(byte[] controlInformation){ + + public Acknowledgement(long ackSeqNo, byte[] controlInformation){ this(); + this.ackSequenceNumber=ackSeqNo; decodeControlInformation(controlInformation); } @@ -70,10 +74,25 @@ roundTripTime =PacketUtil.decode(data, 4); roundTripTimeVariance = PacketUtil.decode(data, 8); bufferSize = PacketUtil.decode(data, 12); - pktArrivalSpeed = PacketUtil.decode(data, 16); - estimatedLinkCapacity = PacketUtil.decode(data, 20); + if(data.length>16){ + pktArrivalSpeed = PacketUtil.decode(data, 16); + estimatedLinkCapacity = PacketUtil.decode(data, 20); + } } + @Override + protected long getAdditionalInfo(){ + return ackSequenceNumber; + } + + public long getAckSequenceNumber() { + return ackSequenceNumber; + } + public void setAckSequenceNumber(long ackSequenceNumber) { + this.ackSequenceNumber = ackSequenceNumber; + } + + /** * get the ack number (the number up to which all packets have been received (excluding)) * @return @@ -81,15 +100,15 @@ public long getAckNumber() { return ackNumber; } - + /** * set the ack number (the number up to which all packets have been received (excluding)) - * @param nexttoPrevPktSeqNO + * @param ackNumber */ - public void setNexttoPrevPktSeqNO(long nexttoPrevPktSeqNO) { - ackNumber = nexttoPrevPktSeqNO; + public void setAckNumber(long ackNumber) { + this.ackNumber = ackNumber; } - + /** * get the round trip time (microseconds) * @return @@ -104,7 +123,7 @@ public void setRoundTripTime(long RoundTripTime) { roundTripTime = RoundTripTime; } - + /** * set the variance of the round trip time (in microseconds) * @param RoundTripTime @@ -112,35 +131,35 @@ public void setRoundTripTimeVar(long roundTripTimeVar) { roundTripTimeVariance = roundTripTimeVar; } - + public long getRoundTripTimeVar() { return roundTripTimeVariance; } - + public long getBufferSize() { return bufferSize; } - + public void setBufferSize(long bufferSiZe) { this.bufferSize = bufferSiZe; } - + public long getPacketReceiveRate() { return pktArrivalSpeed; } public void setPacketReceiveRate(long packetReceiveRate) { this.pktArrivalSpeed = packetReceiveRate; } - - + + public long getEstimatedLinkCapacity() { return estimatedLinkCapacity; } - + public void setEstimatedLinkCapacity(long estimatedLinkCapacity) { this.estimatedLinkCapacity = estimatedLinkCapacity; } - + @Override public byte[] encodeControlInformation(){ try { @@ -151,17 +170,17 @@ bos.write(PacketUtil.encode(bufferSize)); bos.write(PacketUtil.encode(pktArrivalSpeed)); bos.write(PacketUtil.encode(estimatedLinkCapacity)); - + return bos.toByteArray(); } catch (Exception e) { // can't happen return null; } - + } - + @Override public boolean equals(Object obj) { if (this == obj) @@ -185,9 +204,9 @@ return false; return true; } - - - - + + + + } Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2010-08-30 21:17:15 UTC (rev 43) @@ -39,28 +39,40 @@ */ public class Acknowledgment2 extends ControlPacket{ - public Acknowledgment2(){ - this.controlPacketType=ControlPacketType.ACK2.ordinal(); - } - - public Acknowledgment2(byte[]controlInformation){ - this(); - decode(controlInformation ); - } - - void decode(byte[]data){ - } - public boolean forSender(){ - return false; - } - - private static final byte[]empty=new byte[0]; - @Override - public byte[] encodeControlInformation(){ - return empty; - } + //the ack sequence number + private long ackSequenceNumber ; + + public Acknowledgment2(){ + this.controlPacketType=ControlPacketType.ACK2.ordinal(); } + public Acknowledgment2(long ackSeqNo,byte[]controlInformation){ + this(); + this.ackSequenceNumber=ackSeqNo; + decode(controlInformation ); + } + public long getAckSequenceNumber() { + return ackSequenceNumber; + } + public void setAckSequenceNumber(long ackSequenceNumber) { + this.ackSequenceNumber = ackSequenceNumber; + } + + void decode(byte[]data){ + } + public boolean forSender(){ + return false; + } + + private static final byte[]empty=new byte[0]; + @Override + public byte[] encodeControlInformation(){ + return empty; + } +} + + + Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-08-30 21:17:15 UTC (rev 43) @@ -34,6 +34,8 @@ import java.io.ByteArrayOutputStream; +import udt.UDTSession; + public class ConnectionHandshake extends ControlPacket { private long udtVersion=4; @@ -57,7 +59,6 @@ public ConnectionHandshake(byte[]controlInformation){ this(); - //this.controlInformation=controlInformation; decode(controlInformation); } @@ -172,6 +173,23 @@ } + public String toString(){ + StringBuilder sb=new StringBuilder(); + sb.append("ConnectionHandshake ["); + UDTSession session=getSession(); + if(session!=null){ + sb.append(session.getDestination()); + sb.append(", "); + } + sb.append("mySocketID=").append(socketID); + sb.append(", initialSeqNo=").append(initialSeqNo); + sb.append(", packetSize=").append(packetSize); + sb.append(", maxFlowWndSize=").append(maxFlowWndSize); + sb.append(", destSocketID=").append(destinationID); + sb.append("]"); + return sb.toString(); + } + } Modified: udt-java/trunk/src/main/java/udt/packets/ControlPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-08-30 21:17:15 UTC (rev 43) @@ -42,9 +42,6 @@ protected int controlPacketType; - //used for ACK and ACK2 - protected long ackSequenceNumber; - protected long messageNumber; protected long timeStamp; @@ -63,14 +60,6 @@ return controlPacketType; } - public long getAckSequenceNumber() { - return ackSequenceNumber; - } - public void setAckSequenceNumber(long ackSequenceNumber) { - this.ackSequenceNumber = ackSequenceNumber; - } - - public long getMessageNumber() { return messageNumber; } @@ -105,8 +94,8 @@ // //sequence number with highest bit set to "0" try{ ByteArrayOutputStream bos=new ByteArrayOutputStream(16); - bos.write(PacketUtil.encodeHighesBitTypeAndSeqNumber(true, controlPacketType, ackSequenceNumber)); - bos.write(PacketUtil.encode(messageNumber)); + bos.write(PacketUtil.encodeControlPacketType(controlPacketType)); + bos.write(PacketUtil.encode(getAdditionalInfo())); bos.write(PacketUtil.encode(timeStamp)); bos.write(PacketUtil.encode(destinationID)); return bos.toByteArray(); @@ -114,6 +103,14 @@ return null; } } + + /** + * this method gets the "additional info" for this type of control packet + */ + protected long getAdditionalInfo(){ + return 0L; + } + /** * this method builds the control information @@ -149,16 +146,10 @@ if (getClass() != obj.getClass()) return false; ControlPacket other = (ControlPacket) obj; - if (ackSequenceNumber != other.ackSequenceNumber) - return false; if (controlPacketType != other.controlPacketType) return false; - //if (!Arrays.equals(controlInformation, other.controlInformation)) - // return false; if (destinationID != other.destinationID) return false; - if (messageNumber != other.messageNumber) - return false; if (timeStamp != other.timeStamp) return false; return true; Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-08-30 21:17:15 UTC (rev 43) @@ -136,7 +136,7 @@ //sequence number with highest bit set to "0" try{ ByteArrayOutputStream bos=new ByteArrayOutputStream(16); - bos.write(PacketUtil.encodeSetHighest(false, packetSequenceNumber)); + bos.write(PacketUtil.encode(packetSequenceNumber)); bos.write(PacketUtil.encode(messageNumber)); bos.write(PacketUtil.encode(timeStamp)); bos.write(PacketUtil.encode(destinationID)); Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-08-30 21:17:15 UTC (rev 43) @@ -64,8 +64,7 @@ ControlPacket packet=null; int pktType=PacketUtil.decodeType(encodedData, 0); - long ackSeqNo =PacketUtil.decodeAckSeqNr(encodedData, 0); - long msgNr = PacketUtil.decode(encodedData, 4); + long additionalInfo = PacketUtil.decode(encodedData, 4); long timeStamp = PacketUtil.decode(encodedData,8) ; long destID = PacketUtil.decode(encodedData,12); byte[] controlInformation = new byte[length-16]; @@ -81,7 +80,7 @@ } //TYPE 0010:2 else if(ControlPacketType.ACK.ordinal()==pktType){ - packet=new Acknowledgement(controlInformation); + packet=new Acknowledgement(additionalInfo,controlInformation); } //TYPE 0011:3 else if(ControlPacketType.NAK.ordinal()==pktType){ @@ -93,7 +92,7 @@ } //TYPE 0110:6 else if(ControlPacketType.ACK2.ordinal()==pktType){ - packet=new Acknowledgment2(controlInformation); + packet=new Acknowledgment2(additionalInfo,controlInformation); } //TYPE 0111:7 else if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){ @@ -105,8 +104,6 @@ } if(packet!=null){ - packet.setAckSequenceNumber(ackSeqNo); - packet.setMessageNumber(msgNr); packet.setTimeStamp(timeStamp); packet.setDestinationID(destID); } Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java =================================================================== --- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-08-30 21:17:15 UTC (rev 43) @@ -57,60 +57,27 @@ return new byte[]{m4,m3,m2,m1}; } - - - public static byte[]encodeSetHighestAndType(boolean highest,int type,long value){ - byte m4; - byte m3; - if(highest){ - m4= (byte) (0x80 | type<<3); - m3= (byte) (0); - } - else{ - m4= (byte) (0x7f & value>>24 ); - m3=(byte)(value>>16); - } + + public static byte[]encodeControlPacketType(int type){ + byte m4=(byte) 0x80; - byte m2=(byte)(value>>8); - byte m1=(byte)(value); - return new byte[]{m4,m3,m2,m1}; + byte m3=(byte)type; + return new byte[]{m4,m3,0,0}; } - public static byte[]encodeHighesBitTypeAndSeqNumber(boolean highestBit,int type, long value){ - byte m4,m3; - if(highestBit){ - m4=(byte) (0x80 | type<<3); - m3=(byte)(0); - } - else{ - m4= (byte) (0); - m3=(byte)(0); - } - byte m2=(byte)(value>>8); - byte m1=(byte)(value); - return new byte[]{m4,m3,m2,m1}; - } - public static long decode(byte[]data, int start){ - long result = (data[start] & 0xFF)<<24 - |(data[start+1] & 0xFF)<<16 - |(data[start+2] & 0xFF)<<8 - |(data[start+3] & 0xFF); + long result = (data[start]&0xFF)<<24 + | (data[start+1]&0xFF)<<16 + | (data[start+2]&0xFF)<<8 + | (data[start+3]&0xFF); return result; } public static int decodeType(byte[]data, int start){ - int result = (data[start]&0x78)>>3; + int result = data[start+1]&0xFF; return result; } - - public static long decodeAckSeqNr(byte[]data, int start){ - long result = (data[start+2] & 0xFF)<<8 - |(data[start+3] & 0xFF); - return result; - } - } Modified: udt-java/trunk/src/main/java/udt/util/Application.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/Application.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/util/Application.java 2010-08-30 21:17:15 UTC (rev 43) @@ -51,4 +51,23 @@ } } + + + + static long decode(byte[]data, int start){ + long result = (data[start+3] & 0xFF)<<24 + |(data[start+2] & 0xFF)<<16 + |(data[start+1] & 0xFF)<<8 + |(data[start] & 0xFF); + return result; + } + + static byte[]encode(long value){ + byte m4= (byte) (value>>24 ); + byte m3=(byte)(value>>16); + byte m2=(byte)(value>>8); + byte m1=(byte)(value); + return new byte[]{m1,m2,m3,m4}; + } + } Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-08-30 21:17:15 UTC (rev 43) @@ -35,7 +35,6 @@ import java.io.File; import java.io.FileOutputStream; import java.net.InetAddress; -import java.nio.ByteBuffer; import java.text.NumberFormat; import udt.UDTClient; @@ -78,29 +77,29 @@ UDTInputStream in=client.getInputStream(); UDTOutputStream out=client.getOutputStream(); - byte[]readBuf=new byte[1024]; - ByteBuffer bb=ByteBuffer.wrap(readBuf); System.out.println("[ReceiveFile] Requesting file "+remoteFile); //send name file info byte[]fName=remoteFile.getBytes(); - bb.putInt(fName.length+1); - bb.put(fName); - bb.put((byte)0); + out.write(encode(fName.length)); + out.write(fName); - out.write(readBuf, 0, bb.position()); out.flush(); //pause the sender to save some CPU time out.pauseOutput(); //read size info (an 4-byte int) - byte[]sizeInfo=new byte[4]; + byte[]sizeInfo=new byte[8]; - while(in.read(sizeInfo)==0); + int total=0; + while(total<sizeInfo.length){ + int r=in.read(sizeInfo); + if(r<0)break; + total+=r; + } + long size=decode(sizeInfo, 0); - long size=ByteBuffer.wrap(sizeInfo).getInt(); - File file=new File(new String(localFile)); System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">"); FileOutputStream fos=new FileOutputStream(file); Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java =================================================================== --- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-08-30 21:17:15 UTC (rev 43) @@ -51,8 +51,8 @@ import udt.UDTReceiver; import udt.UDTServerSocket; import udt.UDTSocket; -import udt.packets.PacketUtil; + /** * helper application for sending a single file via UDT * Intended to be compatible with the C++ version in @@ -69,9 +69,9 @@ public SendFile(int serverPort){ this.serverPort=serverPort; - + } - + @Override public void configure(){ super.configure(); @@ -85,22 +85,23 @@ UDTServerSocket server=new UDTServerSocket(myHost,serverPort); while(true){ UDTSocket socket=server.accept(); + Thread.sleep(1000); threadPool.execute(new RequestRunner(socket)); } }catch(Exception ex){ throw new RuntimeException(ex); } } - + /** * main() method for invoking as a commandline application * @param args * @throws Exception */ public static void main(String[] fullArgs) throws Exception{ - + String[] args=parseOptions(fullArgs); - + int serverPort=65321; try{ serverPort=Integer.parseInt(args[0]); @@ -114,24 +115,24 @@ public static void usage(){ System.out.println("Usage: java -cp ... udt.util.SendFile <server_port> " + - "[--verbose] [--localPort=<port>] [--localIP=<ip>]"); + "[--verbose] [--localPort=<port>] [--localIP=<ip>]"); } public static class RequestRunner implements Runnable{ - + private final static Logger logger=Logger.getLogger(RequestRunner.class.getName()); - + private final UDTSocket socket; - + private final NumberFormat format=NumberFormat.getNumberInstance(); - + private final boolean memMapped; public RequestRunner(UDTSocket socket){ this.socket=socket; format.setMaximumFractionDigits(3); memMapped=false;//true; } - + public void run(){ try{ logger.info("Handling request from "+socket.getSession().getDestination()); @@ -144,8 +145,19 @@ while(in.read(readBuf)==0)Thread.sleep(100); //how many bytes to read for the file name - int length=bb.getInt(); - byte[]fileName=new byte[length-1]; + byte[]len=new byte[4]; + bb.get(len); + if(verbose){ + StringBuilder sb=new StringBuilder(); + for(int i=0;i<len.length;i++){ + sb.append(Integer.toString(len[i])); + sb.append(" "); + } + System.out.println("[SendFile] name length data: "+sb.toString()); + } + long length=decode(len, 0); + if(verbose)System.out.println("[SendFile] name length : "+length); + byte[]fileName=new byte[(int)length]; bb.get(fileName); File file=new File(new String(fileName)); @@ -156,7 +168,10 @@ long size=file.length(); System.out.println("[SendFile] File size: "+size); //send size info - out.write(PacketUtil.encode(size)); + out.write(encode(size)); + out.write(encode(0l)); + out.flush(); + long start=System.currentTimeMillis(); //and send the file if(memMapped){ @@ -183,8 +198,8 @@ } } } - - + + private static void copyFile(File file, OutputStream os)throws Exception{ FileChannel c=new RandomAccessFile(file,"r").getChannel(); MappedByteBuffer b=c.map(MapMode.READ_ONLY, 0, file.length()); @@ -199,5 +214,6 @@ } os.flush(); } - + + } Modified: udt-java/trunk/src/test/java/udt/TestControlPacket.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestControlPacket.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/test/java/udt/TestControlPacket.java 2010-08-30 21:17:15 UTC (rev 43) @@ -8,7 +8,6 @@ public void testSequenceNumber1(){ ControlPacket p=new DummyControlPacket(); - p.setAckSequenceNumber(1); byte[]x=p.getHeader(); byte highest=x[0]; //check highest bit is "1" for ControlPacket Modified: udt-java/trunk/src/test/java/udt/TestControlPacketType.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestControlPacketType.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/test/java/udt/TestControlPacketType.java 2010-08-30 21:17:15 UTC (rev 43) @@ -8,11 +8,9 @@ public void testSequenceNumber1(){ ControlPacket p=new DummyControlPacket(); - p.setAckSequenceNumber(1); byte[]x=p.getHeader(); byte highest=x[0]; //check highest bit is "1" for ControlPacket - assertEquals(128, highest & 0x80); byte lowest=x[3]; assertEquals(1, lowest); Modified: udt-java/trunk/src/test/java/udt/TestPacketFactory.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestPacketFactory.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/test/java/udt/TestPacketFactory.java 2010-08-30 21:17:15 UTC (rev 43) @@ -16,26 +16,27 @@ public class TestPacketFactory extends TestCase { public void testData(){ - byte[]data="sdjfsdjfldskjflds".getBytes(); + String test="sdjfsdjfldskjflds"; + + byte[]data=test.getBytes(); data[0]=(byte)(data[0] & 0x7f); UDTPacket p=PacketFactory.createPacket(data); DataPacket recv=(DataPacket)p; String t=new String(recv.getEncoded()); assertTrue(p instanceof DataPacket); - assertEquals("sdjfsdjfldskjflds",t); + assertEquals(test,t); } public void testConnectionHandshake(){ ConnectionHandshake p1 = new ConnectionHandshake(); - p1.setAckSequenceNumber(1234); p1.setMessageNumber(9876); p1.setTimeStamp(3456); p1.setDestinationID(1); p1.setConnectionType(1); p1.setSocketType(1); - p1.setInitialSeqNo(1); + p1.setInitialSeqNo(321); p1.setPacketSize(128); p1.setMaxFlowWndSize(128); p1.setSocketID(1); @@ -58,7 +59,7 @@ p1.setDestinationID(1); p1.setBufferSize(128); p1.setEstimatedLinkCapacity(16); - p1.setNexttoPrevPktSeqNO(9870); + p1.setAckNumber(9870); p1.setPacketReceiveRate(1000); p1.setRoundTripTime(1000); p1.setRoundTripTimeVar(500); @@ -86,7 +87,6 @@ public void testNegativeAcknowledgement(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); - p1.setAckSequenceNumber(1231); p1.setMessageNumber(9872); p1.setTimeStamp(3452); p1.setDestinationID(2); @@ -105,7 +105,6 @@ public void testNegativeAcknowledgement2(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); - p1.setAckSequenceNumber(1231); p1.setMessageNumber(9872); p1.setTimeStamp(3452); p1.setDestinationID(2); @@ -130,7 +129,6 @@ public void testNegativeAcknowledgement3(){ NegativeAcknowledgement p1 = new NegativeAcknowledgement(); - p1.setAckSequenceNumber(1231); p1.setMessageNumber(9872); p1.setTimeStamp(3452); p1.setDestinationID(2); @@ -148,7 +146,6 @@ public void testShutdown(){ Shutdown p1 = new Shutdown(); - p1.setAckSequenceNumber(1233); p1.setMessageNumber(9874); p1.setTimeStamp(3453); p1.setDestinationID(3); @@ -165,7 +162,6 @@ public void testMessageDropRequest(){ MessageDropRequest p1=new MessageDropRequest(); - p1.setAckSequenceNumber(1234); p1.setMessageNumber(9876); p1.setTimeStamp(3456); p1.setDestinationID(4); Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java =================================================================== --- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-08-30 11:45:32 UTC (rev 42) +++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-08-30 21:17:15 UTC (rev 43) @@ -17,8 +17,8 @@ Thread.sleep(500); }while(!serverStarted); - //File f=new File("src/test/java/datafile"); - File f=new File("/tmp/100MB"); + File f=new File("src/test/java/datafile"); + //File f=new File("/tmp/100MB"); File tmp=File.createTempFile("udtest-", null); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |