[Udt-java-commits] SF.net SVN: udt-java:[43] udt-java/trunk/src
Status: Alpha
Brought to you by:
bschuller
|
From: <bsc...@us...> - 2010-08-30 21:17:22
|
Revision: 43
http://udt-java.svn.sourceforge.net/udt-java/?rev=43&view=rev
Author: bschuller
Date: 2010-08-30 21:17:15 +0000 (Mon, 30 Aug 2010)
Log Message:
-----------
change packet encoding to be compatible with UDT v4 (C++ version); change send/recv file a bit. Does not quite work, but much better :)
Modified Paths:
--------------
udt-java/trunk/src/main/java/udt/ClientSession.java
udt-java/trunk/src/main/java/udt/ServerSession.java
udt-java/trunk/src/main/java/udt/UDPEndPoint.java
udt-java/trunk/src/main/java/udt/UDTReceiver.java
udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java
udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java
udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
udt-java/trunk/src/main/java/udt/packets/ControlPacket.java
udt-java/trunk/src/main/java/udt/packets/DataPacket.java
udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
udt-java/trunk/src/main/java/udt/util/Application.java
udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
udt-java/trunk/src/main/java/udt/util/SendFile.java
udt-java/trunk/src/test/java/udt/TestControlPacket.java
udt-java/trunk/src/test/java/udt/TestControlPacketType.java
udt-java/trunk/src/test/java/udt/TestPacketFactory.java
udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
Modified: udt-java/trunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ClientSession.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/ClientSession.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -135,7 +135,7 @@
handshake.setPacketSize(getDatagramSize());
handshake.setSocketID(mySocketID);
handshake.setSession(this);
- logger.info("Handshake to "+this.getDestination());
+ logger.info("Sending "+handshake);
endPoint.doSend(handshake);
}
Modified: udt-java/trunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/trunk/src/main/java/udt/ServerSession.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/ServerSession.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -56,10 +56,10 @@
//last received packet (for testing purposes)
private UDTPacket lastPacket;
- public ServerSession(DatagramPacket dp,UDPEndPoint endPoint)throws SocketException,UnknownHostException{
+ public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort()));
this.endPoint=endPoint;
- logger.info("Created "+toString()+" talking to "+getDestination());
+ logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
}
int n_handshake=0;
@@ -68,14 +68,16 @@
public void received(UDTPacket packet, Destination peer){
lastPacket=packet;
if (getState()<=ready && packet instanceof ConnectionHandshake) {
- logger.info("Received ConnectionHandshake from "+peer);
ConnectionHandshake connectionHandshake=(ConnectionHandshake)packet;
destination.setSocketID(connectionHandshake.getSocketID());
+
+ logger.info("Received "+connectionHandshake);
+
if(getState()<=handshaking){
setState(handshaking);
}
try{
- handleHandShake(connectionHandshake,peer);
+ handleHandShake(connectionHandshake);
n_handshake++;
try{
setState(ready);
@@ -154,7 +156,7 @@
* @param peer
* @throws IOException
*/
- protected void handleHandShake(ConnectionHandshake handshake,Destination peer)throws IOException{
+ protected void handleHandShake(ConnectionHandshake handshake)throws IOException{
ConnectionHandshake responseHandshake = new ConnectionHandshake();
//compare the packet size and choose minimun
long clientBufferSize=handshake.getPacketSize();
@@ -166,11 +168,13 @@
responseHandshake.setPacketSize(bufferSize);
responseHandshake.setUdtVersion(4);
responseHandshake.setInitialSeqNo(initialSequenceNumber);
- responseHandshake.setConnectionType(1);
+ responseHandshake.setConnectionType(-1);
+ responseHandshake.setMaxFlowWndSize(handshake.getMaxFlowWndSize());
//tell peer what the socket ID on this side is
responseHandshake.setSocketID(mySocketID);
responseHandshake.setDestinationID(this.getDestination().getSocketID());
responseHandshake.setSession(this);
+ logger.info("Sending reply "+responseHandshake);
endPoint.doSend(responseHandshake);
}
Modified: udt-java/trunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/UDPEndPoint.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -50,7 +50,6 @@
import udt.packets.ConnectionHandshake;
import udt.packets.Destination;
import udt.packets.PacketFactory;
-import udt.util.MeanValue;
import udt.util.UDTThreadFactory;
/**
@@ -203,6 +202,7 @@
}
public void addSession(Long destinationID,UDTSession session){
+ logger.info("Storing session <"+destinationID+">");
sessions.put(destinationID, session);
}
@@ -248,17 +248,21 @@
private long lastDestID=-1;
private UDTSession lastSession;
- MeanValue v=new MeanValue("receiver processing ",true, 256);
+ //MeanValue v=new MeanValue("receiver processing ",true, 256);
+ private final Object lock=new Object();
+
protected void doReceive()throws IOException{
while(!stopped){
try{
try{
- v.end();
+ //v.end();
+
//will block until a packet is received or timeout has expired
dgSocket.receive(dp);
- v.begin();
+ //v.begin();
+
Destination peer=new Destination(dp.getAddress(), dp.getPort());
int l=dp.getLength();
UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
@@ -266,19 +270,21 @@
//handle connection handshake
if(packet.isConnectionHandshake()){
- UDTSession session=clientSessions.get(peer);
- if(session==null){
- session=new ServerSession(dp,this);
- addSession(session.getSocketID(),session);
- //TODO need to check peer to avoid duplicate server session
- if(serverSocketMode){
- logger.fine("Pooling new request.");
- sessionHandoff.put(session);
- logger.fine("Request taken for processing.");
+ synchronized(lock){
+ UDTSession session=clientSessions.get(peer);
+ if(session==null){
+ session=new ServerSession(dp,this);
+ addSession(session.getSocketID(),session);
+ //TODO need to check peer to avoid duplicate server session
+ if(serverSocketMode){
+ logger.fine("Pooling new request.");
+ sessionHandoff.put(session);
+ logger.fine("Request taken for processing.");
+ }
}
+ peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
+ session.received(packet,peer);
}
- peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
- session.received(packet,peer);
}
else{
//dispatch to existing session
Modified: udt-java/trunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/UDTReceiver.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -500,7 +500,7 @@
private Acknowledgement buildLightAcknowledgement(long ackNumber){
Acknowledgement acknowledgmentPkt = new Acknowledgement();
//the packet sequence number to which all the packets have been received
- acknowledgmentPkt.setNexttoPrevPktSeqNO(ackNumber);
+ acknowledgmentPkt.setAckNumber(ackNumber);
//assign this ack a unique increasing ACK sequence number
acknowledgmentPkt.setAckSequenceNumber(++ackSequenceNumber);
acknowledgmentPkt.setRoundTripTime(roundTripTime);
Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/Acknowledgement.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -41,10 +41,13 @@
* receipt of packets
*/
public class Acknowledgement extends ControlPacket {
-
+
+ //the ack sequence number
+ private long ackSequenceNumber ;
+
//the packet sequence number to which all the previous packets have been received (excluding)
private long ackNumber ;
-
+
//round-trip time in microseconds(RTT)
private long roundTripTime;
// RTT variance
@@ -55,13 +58,14 @@
private long pktArrivalSpeed;
//estimated link capacity in number of packets per second
private long estimatedLinkCapacity;
-
+
public Acknowledgement(){
this.controlPacketType=ControlPacketType.ACK.ordinal();
}
-
- public Acknowledgement(byte[] controlInformation){
+
+ public Acknowledgement(long ackSeqNo, byte[] controlInformation){
this();
+ this.ackSequenceNumber=ackSeqNo;
decodeControlInformation(controlInformation);
}
@@ -70,10 +74,25 @@
roundTripTime =PacketUtil.decode(data, 4);
roundTripTimeVariance = PacketUtil.decode(data, 8);
bufferSize = PacketUtil.decode(data, 12);
- pktArrivalSpeed = PacketUtil.decode(data, 16);
- estimatedLinkCapacity = PacketUtil.decode(data, 20);
+ if(data.length>16){
+ pktArrivalSpeed = PacketUtil.decode(data, 16);
+ estimatedLinkCapacity = PacketUtil.decode(data, 20);
+ }
}
+ @Override
+ protected long getAdditionalInfo(){
+ return ackSequenceNumber;
+ }
+
+ public long getAckSequenceNumber() {
+ return ackSequenceNumber;
+ }
+ public void setAckSequenceNumber(long ackSequenceNumber) {
+ this.ackSequenceNumber = ackSequenceNumber;
+ }
+
+
/**
* get the ack number (the number up to which all packets have been received (excluding))
* @return
@@ -81,15 +100,15 @@
public long getAckNumber() {
return ackNumber;
}
-
+
/**
* set the ack number (the number up to which all packets have been received (excluding))
- * @param nexttoPrevPktSeqNO
+ * @param ackNumber
*/
- public void setNexttoPrevPktSeqNO(long nexttoPrevPktSeqNO) {
- ackNumber = nexttoPrevPktSeqNO;
+ public void setAckNumber(long ackNumber) {
+ this.ackNumber = ackNumber;
}
-
+
/**
* get the round trip time (microseconds)
* @return
@@ -104,7 +123,7 @@
public void setRoundTripTime(long RoundTripTime) {
roundTripTime = RoundTripTime;
}
-
+
/**
* set the variance of the round trip time (in microseconds)
* @param RoundTripTime
@@ -112,35 +131,35 @@
public void setRoundTripTimeVar(long roundTripTimeVar) {
roundTripTimeVariance = roundTripTimeVar;
}
-
+
public long getRoundTripTimeVar() {
return roundTripTimeVariance;
}
-
+
public long getBufferSize() {
return bufferSize;
}
-
+
public void setBufferSize(long bufferSiZe) {
this.bufferSize = bufferSiZe;
}
-
+
public long getPacketReceiveRate() {
return pktArrivalSpeed;
}
public void setPacketReceiveRate(long packetReceiveRate) {
this.pktArrivalSpeed = packetReceiveRate;
}
-
-
+
+
public long getEstimatedLinkCapacity() {
return estimatedLinkCapacity;
}
-
+
public void setEstimatedLinkCapacity(long estimatedLinkCapacity) {
this.estimatedLinkCapacity = estimatedLinkCapacity;
}
-
+
@Override
public byte[] encodeControlInformation(){
try {
@@ -151,17 +170,17 @@
bos.write(PacketUtil.encode(bufferSize));
bos.write(PacketUtil.encode(pktArrivalSpeed));
bos.write(PacketUtil.encode(estimatedLinkCapacity));
-
+
return bos.toByteArray();
} catch (Exception e) {
// can't happen
return null;
}
-
+
}
-
+
@Override
public boolean equals(Object obj) {
if (this == obj)
@@ -185,9 +204,9 @@
return false;
return true;
}
-
-
-
-
+
+
+
+
}
Modified: udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/Acknowledgment2.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -39,28 +39,40 @@
*/
public class Acknowledgment2 extends ControlPacket{
- public Acknowledgment2(){
- this.controlPacketType=ControlPacketType.ACK2.ordinal();
- }
-
- public Acknowledgment2(byte[]controlInformation){
- this();
- decode(controlInformation );
- }
-
- void decode(byte[]data){
- }
- public boolean forSender(){
- return false;
- }
-
- private static final byte[]empty=new byte[0];
- @Override
- public byte[] encodeControlInformation(){
- return empty;
- }
+ //the ack sequence number
+ private long ackSequenceNumber ;
+
+ public Acknowledgment2(){
+ this.controlPacketType=ControlPacketType.ACK2.ordinal();
}
+ public Acknowledgment2(long ackSeqNo,byte[]controlInformation){
+ this();
+ this.ackSequenceNumber=ackSeqNo;
+ decode(controlInformation );
+ }
+ public long getAckSequenceNumber() {
+ return ackSequenceNumber;
+ }
+ public void setAckSequenceNumber(long ackSequenceNumber) {
+ this.ackSequenceNumber = ackSequenceNumber;
+ }
+
+ void decode(byte[]data){
+ }
+ public boolean forSender(){
+ return false;
+ }
+
+ private static final byte[]empty=new byte[0];
+ @Override
+ public byte[] encodeControlInformation(){
+ return empty;
+ }
+}
+
+
+
Modified: udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/ConnectionHandshake.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -34,6 +34,8 @@
import java.io.ByteArrayOutputStream;
+import udt.UDTSession;
+
public class ConnectionHandshake extends ControlPacket {
private long udtVersion=4;
@@ -57,7 +59,6 @@
public ConnectionHandshake(byte[]controlInformation){
this();
- //this.controlInformation=controlInformation;
decode(controlInformation);
}
@@ -172,6 +173,23 @@
}
+ public String toString(){
+ StringBuilder sb=new StringBuilder();
+ sb.append("ConnectionHandshake [");
+ UDTSession session=getSession();
+ if(session!=null){
+ sb.append(session.getDestination());
+ sb.append(", ");
+ }
+ sb.append("mySocketID=").append(socketID);
+ sb.append(", initialSeqNo=").append(initialSeqNo);
+ sb.append(", packetSize=").append(packetSize);
+ sb.append(", maxFlowWndSize=").append(maxFlowWndSize);
+ sb.append(", destSocketID=").append(destinationID);
+ sb.append("]");
+ return sb.toString();
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/packets/ControlPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/ControlPacket.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -42,9 +42,6 @@
protected int controlPacketType;
- //used for ACK and ACK2
- protected long ackSequenceNumber;
-
protected long messageNumber;
protected long timeStamp;
@@ -63,14 +60,6 @@
return controlPacketType;
}
- public long getAckSequenceNumber() {
- return ackSequenceNumber;
- }
- public void setAckSequenceNumber(long ackSequenceNumber) {
- this.ackSequenceNumber = ackSequenceNumber;
- }
-
-
public long getMessageNumber() {
return messageNumber;
}
@@ -105,8 +94,8 @@
// //sequence number with highest bit set to "0"
try{
ByteArrayOutputStream bos=new ByteArrayOutputStream(16);
- bos.write(PacketUtil.encodeHighesBitTypeAndSeqNumber(true, controlPacketType, ackSequenceNumber));
- bos.write(PacketUtil.encode(messageNumber));
+ bos.write(PacketUtil.encodeControlPacketType(controlPacketType));
+ bos.write(PacketUtil.encode(getAdditionalInfo()));
bos.write(PacketUtil.encode(timeStamp));
bos.write(PacketUtil.encode(destinationID));
return bos.toByteArray();
@@ -114,6 +103,14 @@
return null;
}
}
+
+ /**
+ * this method gets the "additional info" for this type of control packet
+ */
+ protected long getAdditionalInfo(){
+ return 0L;
+ }
+
/**
* this method builds the control information
@@ -149,16 +146,10 @@
if (getClass() != obj.getClass())
return false;
ControlPacket other = (ControlPacket) obj;
- if (ackSequenceNumber != other.ackSequenceNumber)
- return false;
if (controlPacketType != other.controlPacketType)
return false;
- //if (!Arrays.equals(controlInformation, other.controlInformation))
- // return false;
if (destinationID != other.destinationID)
return false;
- if (messageNumber != other.messageNumber)
- return false;
if (timeStamp != other.timeStamp)
return false;
return true;
Modified: udt-java/trunk/src/main/java/udt/packets/DataPacket.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/DataPacket.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -136,7 +136,7 @@
//sequence number with highest bit set to "0"
try{
ByteArrayOutputStream bos=new ByteArrayOutputStream(16);
- bos.write(PacketUtil.encodeSetHighest(false, packetSequenceNumber));
+ bos.write(PacketUtil.encode(packetSequenceNumber));
bos.write(PacketUtil.encode(messageNumber));
bos.write(PacketUtil.encode(timeStamp));
bos.write(PacketUtil.encode(destinationID));
Modified: udt-java/trunk/src/main/java/udt/packets/PacketFactory.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/PacketFactory.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -64,8 +64,7 @@
ControlPacket packet=null;
int pktType=PacketUtil.decodeType(encodedData, 0);
- long ackSeqNo =PacketUtil.decodeAckSeqNr(encodedData, 0);
- long msgNr = PacketUtil.decode(encodedData, 4);
+ long additionalInfo = PacketUtil.decode(encodedData, 4);
long timeStamp = PacketUtil.decode(encodedData,8) ;
long destID = PacketUtil.decode(encodedData,12);
byte[] controlInformation = new byte[length-16];
@@ -81,7 +80,7 @@
}
//TYPE 0010:2
else if(ControlPacketType.ACK.ordinal()==pktType){
- packet=new Acknowledgement(controlInformation);
+ packet=new Acknowledgement(additionalInfo,controlInformation);
}
//TYPE 0011:3
else if(ControlPacketType.NAK.ordinal()==pktType){
@@ -93,7 +92,7 @@
}
//TYPE 0110:6
else if(ControlPacketType.ACK2.ordinal()==pktType){
- packet=new Acknowledgment2(controlInformation);
+ packet=new Acknowledgment2(additionalInfo,controlInformation);
}
//TYPE 0111:7
else if(ControlPacketType.MESSAGE_DROP_REQUEST.ordinal()==pktType){
@@ -105,8 +104,6 @@
}
if(packet!=null){
- packet.setAckSequenceNumber(ackSeqNo);
- packet.setMessageNumber(msgNr);
packet.setTimeStamp(timeStamp);
packet.setDestinationID(destID);
}
Modified: udt-java/trunk/src/main/java/udt/packets/PacketUtil.java
===================================================================
--- udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/packets/PacketUtil.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -57,60 +57,27 @@
return new byte[]{m4,m3,m2,m1};
}
-
-
- public static byte[]encodeSetHighestAndType(boolean highest,int type,long value){
- byte m4;
- byte m3;
- if(highest){
- m4= (byte) (0x80 | type<<3);
- m3= (byte) (0);
- }
- else{
- m4= (byte) (0x7f & value>>24 );
- m3=(byte)(value>>16);
- }
+
+ public static byte[]encodeControlPacketType(int type){
+ byte m4=(byte) 0x80;
- byte m2=(byte)(value>>8);
- byte m1=(byte)(value);
- return new byte[]{m4,m3,m2,m1};
+ byte m3=(byte)type;
+ return new byte[]{m4,m3,0,0};
}
- public static byte[]encodeHighesBitTypeAndSeqNumber(boolean highestBit,int type, long value){
- byte m4,m3;
- if(highestBit){
- m4=(byte) (0x80 | type<<3);
- m3=(byte)(0);
- }
- else{
- m4= (byte) (0);
- m3=(byte)(0);
- }
- byte m2=(byte)(value>>8);
- byte m1=(byte)(value);
- return new byte[]{m4,m3,m2,m1};
- }
-
public static long decode(byte[]data, int start){
- long result = (data[start] & 0xFF)<<24
- |(data[start+1] & 0xFF)<<16
- |(data[start+2] & 0xFF)<<8
- |(data[start+3] & 0xFF);
+ long result = (data[start]&0xFF)<<24
+ | (data[start+1]&0xFF)<<16
+ | (data[start+2]&0xFF)<<8
+ | (data[start+3]&0xFF);
return result;
}
public static int decodeType(byte[]data, int start){
- int result = (data[start]&0x78)>>3;
+ int result = data[start+1]&0xFF;
return result;
}
-
- public static long decodeAckSeqNr(byte[]data, int start){
- long result = (data[start+2] & 0xFF)<<8
- |(data[start+3] & 0xFF);
- return result;
- }
-
}
Modified: udt-java/trunk/src/main/java/udt/util/Application.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/Application.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/util/Application.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -51,4 +51,23 @@
}
}
+
+
+
+ static long decode(byte[]data, int start){
+ long result = (data[start+3] & 0xFF)<<24
+ |(data[start+2] & 0xFF)<<16
+ |(data[start+1] & 0xFF)<<8
+ |(data[start] & 0xFF);
+ return result;
+ }
+
+ static byte[]encode(long value){
+ byte m4= (byte) (value>>24 );
+ byte m3=(byte)(value>>16);
+ byte m2=(byte)(value>>8);
+ byte m1=(byte)(value);
+ return new byte[]{m1,m2,m3,m4};
+ }
+
}
Modified: udt-java/trunk/src/main/java/udt/util/ReceiveFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/util/ReceiveFile.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -35,7 +35,6 @@
import java.io.File;
import java.io.FileOutputStream;
import java.net.InetAddress;
-import java.nio.ByteBuffer;
import java.text.NumberFormat;
import udt.UDTClient;
@@ -78,29 +77,29 @@
UDTInputStream in=client.getInputStream();
UDTOutputStream out=client.getOutputStream();
- byte[]readBuf=new byte[1024];
- ByteBuffer bb=ByteBuffer.wrap(readBuf);
System.out.println("[ReceiveFile] Requesting file "+remoteFile);
//send name file info
byte[]fName=remoteFile.getBytes();
- bb.putInt(fName.length+1);
- bb.put(fName);
- bb.put((byte)0);
+ out.write(encode(fName.length));
+ out.write(fName);
- out.write(readBuf, 0, bb.position());
out.flush();
//pause the sender to save some CPU time
out.pauseOutput();
//read size info (an 4-byte int)
- byte[]sizeInfo=new byte[4];
+ byte[]sizeInfo=new byte[8];
- while(in.read(sizeInfo)==0);
+ int total=0;
+ while(total<sizeInfo.length){
+ int r=in.read(sizeInfo);
+ if(r<0)break;
+ total+=r;
+ }
+ long size=decode(sizeInfo, 0);
- long size=ByteBuffer.wrap(sizeInfo).getInt();
-
File file=new File(new String(localFile));
System.out.println("[ReceiveFile] Write to local file <"+file.getAbsolutePath()+">");
FileOutputStream fos=new FileOutputStream(file);
Modified: udt-java/trunk/src/main/java/udt/util/SendFile.java
===================================================================
--- udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/main/java/udt/util/SendFile.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -51,8 +51,8 @@
import udt.UDTReceiver;
import udt.UDTServerSocket;
import udt.UDTSocket;
-import udt.packets.PacketUtil;
+
/**
* helper application for sending a single file via UDT
* Intended to be compatible with the C++ version in
@@ -69,9 +69,9 @@
public SendFile(int serverPort){
this.serverPort=serverPort;
-
+
}
-
+
@Override
public void configure(){
super.configure();
@@ -85,22 +85,23 @@
UDTServerSocket server=new UDTServerSocket(myHost,serverPort);
while(true){
UDTSocket socket=server.accept();
+ Thread.sleep(1000);
threadPool.execute(new RequestRunner(socket));
}
}catch(Exception ex){
throw new RuntimeException(ex);
}
}
-
+
/**
* main() method for invoking as a commandline application
* @param args
* @throws Exception
*/
public static void main(String[] fullArgs) throws Exception{
-
+
String[] args=parseOptions(fullArgs);
-
+
int serverPort=65321;
try{
serverPort=Integer.parseInt(args[0]);
@@ -114,24 +115,24 @@
public static void usage(){
System.out.println("Usage: java -cp ... udt.util.SendFile <server_port> " +
- "[--verbose] [--localPort=<port>] [--localIP=<ip>]");
+ "[--verbose] [--localPort=<port>] [--localIP=<ip>]");
}
public static class RequestRunner implements Runnable{
-
+
private final static Logger logger=Logger.getLogger(RequestRunner.class.getName());
-
+
private final UDTSocket socket;
-
+
private final NumberFormat format=NumberFormat.getNumberInstance();
-
+
private final boolean memMapped;
public RequestRunner(UDTSocket socket){
this.socket=socket;
format.setMaximumFractionDigits(3);
memMapped=false;//true;
}
-
+
public void run(){
try{
logger.info("Handling request from "+socket.getSession().getDestination());
@@ -144,8 +145,19 @@
while(in.read(readBuf)==0)Thread.sleep(100);
//how many bytes to read for the file name
- int length=bb.getInt();
- byte[]fileName=new byte[length-1];
+ byte[]len=new byte[4];
+ bb.get(len);
+ if(verbose){
+ StringBuilder sb=new StringBuilder();
+ for(int i=0;i<len.length;i++){
+ sb.append(Integer.toString(len[i]));
+ sb.append(" ");
+ }
+ System.out.println("[SendFile] name length data: "+sb.toString());
+ }
+ long length=decode(len, 0);
+ if(verbose)System.out.println("[SendFile] name length : "+length);
+ byte[]fileName=new byte[(int)length];
bb.get(fileName);
File file=new File(new String(fileName));
@@ -156,7 +168,10 @@
long size=file.length();
System.out.println("[SendFile] File size: "+size);
//send size info
- out.write(PacketUtil.encode(size));
+ out.write(encode(size));
+ out.write(encode(0l));
+ out.flush();
+
long start=System.currentTimeMillis();
//and send the file
if(memMapped){
@@ -183,8 +198,8 @@
}
}
}
-
-
+
+
private static void copyFile(File file, OutputStream os)throws Exception{
FileChannel c=new RandomAccessFile(file,"r").getChannel();
MappedByteBuffer b=c.map(MapMode.READ_ONLY, 0, file.length());
@@ -199,5 +214,6 @@
}
os.flush();
}
-
+
+
}
Modified: udt-java/trunk/src/test/java/udt/TestControlPacket.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestControlPacket.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/test/java/udt/TestControlPacket.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -8,7 +8,6 @@
public void testSequenceNumber1(){
ControlPacket p=new DummyControlPacket();
- p.setAckSequenceNumber(1);
byte[]x=p.getHeader();
byte highest=x[0];
//check highest bit is "1" for ControlPacket
Modified: udt-java/trunk/src/test/java/udt/TestControlPacketType.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestControlPacketType.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/test/java/udt/TestControlPacketType.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -8,11 +8,9 @@
public void testSequenceNumber1(){
ControlPacket p=new DummyControlPacket();
- p.setAckSequenceNumber(1);
byte[]x=p.getHeader();
byte highest=x[0];
//check highest bit is "1" for ControlPacket
-
assertEquals(128, highest & 0x80);
byte lowest=x[3];
assertEquals(1, lowest);
Modified: udt-java/trunk/src/test/java/udt/TestPacketFactory.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestPacketFactory.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/test/java/udt/TestPacketFactory.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -16,26 +16,27 @@
public class TestPacketFactory extends TestCase {
public void testData(){
- byte[]data="sdjfsdjfldskjflds".getBytes();
+ String test="sdjfsdjfldskjflds";
+
+ byte[]data=test.getBytes();
data[0]=(byte)(data[0] & 0x7f);
UDTPacket p=PacketFactory.createPacket(data);
DataPacket recv=(DataPacket)p;
String t=new String(recv.getEncoded());
assertTrue(p instanceof DataPacket);
- assertEquals("sdjfsdjfldskjflds",t);
+ assertEquals(test,t);
}
public void testConnectionHandshake(){
ConnectionHandshake p1 = new ConnectionHandshake();
- p1.setAckSequenceNumber(1234);
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
p1.setDestinationID(1);
p1.setConnectionType(1);
p1.setSocketType(1);
- p1.setInitialSeqNo(1);
+ p1.setInitialSeqNo(321);
p1.setPacketSize(128);
p1.setMaxFlowWndSize(128);
p1.setSocketID(1);
@@ -58,7 +59,7 @@
p1.setDestinationID(1);
p1.setBufferSize(128);
p1.setEstimatedLinkCapacity(16);
- p1.setNexttoPrevPktSeqNO(9870);
+ p1.setAckNumber(9870);
p1.setPacketReceiveRate(1000);
p1.setRoundTripTime(1000);
p1.setRoundTripTimeVar(500);
@@ -86,7 +87,6 @@
public void testNegativeAcknowledgement(){
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
- p1.setAckSequenceNumber(1231);
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
p1.setDestinationID(2);
@@ -105,7 +105,6 @@
public void testNegativeAcknowledgement2(){
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
- p1.setAckSequenceNumber(1231);
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
p1.setDestinationID(2);
@@ -130,7 +129,6 @@
public void testNegativeAcknowledgement3(){
NegativeAcknowledgement p1 = new NegativeAcknowledgement();
- p1.setAckSequenceNumber(1231);
p1.setMessageNumber(9872);
p1.setTimeStamp(3452);
p1.setDestinationID(2);
@@ -148,7 +146,6 @@
public void testShutdown(){
Shutdown p1 = new Shutdown();
- p1.setAckSequenceNumber(1233);
p1.setMessageNumber(9874);
p1.setTimeStamp(3453);
p1.setDestinationID(3);
@@ -165,7 +162,6 @@
public void testMessageDropRequest(){
MessageDropRequest p1=new MessageDropRequest();
- p1.setAckSequenceNumber(1234);
p1.setMessageNumber(9876);
p1.setTimeStamp(3456);
p1.setDestinationID(4);
Modified: udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java
===================================================================
--- udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-08-30 11:45:32 UTC (rev 42)
+++ udt-java/trunk/src/test/java/udt/TestSendFileReceiveFile.java 2010-08-30 21:17:15 UTC (rev 43)
@@ -17,8 +17,8 @@
Thread.sleep(500);
}while(!serverStarted);
- //File f=new File("src/test/java/datafile");
- File f=new File("/tmp/100MB");
+ File f=new File("src/test/java/datafile");
+ //File f=new File("/tmp/100MB");
File tmp=File.createTempFile("udtest-", null);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|