[Udt-java-commits] SF.net SVN: udt-java:[60] udt-java/skunk
Status: Alpha
Brought to you by:
bschuller
|
From: <pe...@us...> - 2011-08-05 04:24:09
|
Revision: 60
http://udt-java.svn.sourceforge.net/udt-java/?rev=60&view=rev
Author: pete_
Date: 2011-08-05 04:24:00 +0000 (Fri, 05 Aug 2011)
Log Message:
-----------
Experimental changes, partial implementation of java sockets and UDP multiplexing.
Modified Paths:
--------------
udt-java/skunk/src/main/java/udt/ClientSession.java
udt-java/skunk/src/main/java/udt/ServerSession.java
udt-java/skunk/src/main/java/udt/UDPEndPoint.java
udt-java/skunk/src/main/java/udt/UDTClient.java
udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
udt-java/skunk/src/main/java/udt/UDTInputStream.java
udt-java/skunk/src/main/java/udt/UDTReceiver.java
udt-java/skunk/src/main/java/udt/UDTSender.java
udt-java/skunk/src/main/java/udt/UDTServerSocket.java
udt-java/skunk/src/main/java/udt/UDTSession.java
udt-java/skunk/src/main/java/udt/UDTSocket.java
udt-java/skunk/src/main/java/udt/packets/ConnectionHandshake.java
udt-java/skunk/src/main/java/udt/packets/DataPacket.java
udt-java/skunk/src/main/java/udt/packets/PacketFactory.java
udt-java/skunk/src/main/java/udt/packets/PacketUtil.java
udt-java/skunk/src/main/java/udt/unicore/FufexSend.java
udt-java/skunk/src/main/java/udt/util/ReceiveBuffer.java
udt-java/skunk/src/main/java/udt/util/ReceiveFile.java
udt-java/skunk/src/main/java/udt/util/SendFile.java
udt-java/skunk/src/main/java/udt/util/SequenceNumber.java
udt-java/skunk/src/test/java/echo/EchoServer.java
udt-java/skunk/src/test/java/echo/TestEchoServer.java
udt-java/skunk/src/test/java/echo/TestEchoServerMultiClient.java
udt-java/skunk/src/test/java/udt/TestUDTServerSocket.java
udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java
udt-java/skunk/src/test/java/udt/performance/TestUDTLargeData.java
Added Paths:
-----------
udt-java/skunk/src/main/java/udt/packets/UDTSocketAddress.java
udt-java/skunk/src/main/java/udt/util/ObjectPool.java
udt-java/skunk/src/main/java/udt/util/Recycler.java
Removed Paths:
-------------
udt-java/skunk/src/main/java/udt/packets/Destination.java
Property Changed:
----------------
udt-java/skunk/
Property changes on: udt-java/skunk
___________________________________________________________________
Added: svn:ignore
+ target
Modified: udt-java/skunk/src/main/java/udt/ClientSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -38,7 +38,7 @@
import java.util.logging.Logger;
import udt.packets.ConnectionHandshake;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.Shutdown;
import udt.util.SequenceNumber;
@@ -52,7 +52,7 @@
private UDPEndPoint endPoint;
- public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{
+ public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{
super("ClientSession localPort="+endPoint.getLocalPort(),dest);
this.endPoint=endPoint;
logger.info("Created "+toString());
@@ -78,7 +78,7 @@
}
@Override
- public void received(UDTPacket packet, Destination peer) {
+ public void received(UDTPacket packet, UDTSocketAddress peer) {
lastPacket=packet;
@@ -91,7 +91,7 @@
if(hs.getConnectionType()==1){
try{
//TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
+ int peerSocketID=hs.getSocketID();
destination.setSocketID(peerSocketID);
sendConfirmation(hs);
}catch(Exception ex){
@@ -103,10 +103,12 @@
else{
try{
//TODO validate parameters sent by peer
- long peerSocketID=hs.getSocketID();
+ int peerSocketID=hs.getSocketID();
destination.setSocketID(peerSocketID);
setState(ready);
+ if (socket == null){
socket=new UDTSocket(endPoint,this);
+ }
}catch(Exception ex){
logger.log(Level.WARNING,"Error creating socket",ex);
setState(invalid);
@@ -146,9 +148,7 @@
ConnectionHandshake handshake = new ConnectionHandshake();
handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR);
handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM);
- long initialSequenceNo=SequenceNumber.random();
- setInitialSequenceNumber(initialSequenceNo);
- handshake.setInitialSeqNo(initialSequenceNo);
+ handshake.setInitialSeqNo(getCurrentSequenceNumber());
handshake.setPacketSize(getDatagramSize());
handshake.setSocketID(mySocketID);
handshake.setMaxFlowWndSize(flowWindowSize);
Modified: udt-java/skunk/src/main/java/udt/ServerSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -40,7 +40,7 @@
import java.util.logging.Logger;
import udt.packets.ConnectionHandshake;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.KeepAlive;
import udt.packets.Shutdown;
@@ -57,7 +57,7 @@
private UDTPacket lastPacket;
public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{
- super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort()));
+ super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0));
this.endPoint=endPoint;
logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort());
}
@@ -65,7 +65,7 @@
int n_handshake=0;
@Override
- public void received(UDTPacket packet, Destination peer){
+ public void received(UDTPacket packet, UDTSocketAddress peer){
lastPacket=packet;
if(packet instanceof ConnectionHandshake) {
@@ -83,7 +83,9 @@
n_handshake++;
try{
setState(ready);
+ if (socket == null){
socket=new UDTSocket(endPoint, this);
+ }
cc.init();
}catch(Exception uhe){
//session is invalid
@@ -166,7 +168,7 @@
long clientBufferSize=handshake.getPacketSize();
long myBufferSize=getDatagramSize();
long bufferSize=Math.min(clientBufferSize, myBufferSize);
- long initialSequenceNumber=handshake.getInitialSeqNo();
+ int initialSequenceNumber=(int) handshake.getInitialSeqNo();
setInitialSequenceNumber(initialSequenceNumber);
setDatagramSize((int)bufferSize);
responseHandshake.setPacketSize(bufferSize);
Modified: udt-java/skunk/src/main/java/udt/UDPEndPoint.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -36,20 +36,34 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import udt.packets.ConnectionHandshake;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.PacketFactory;
+import udt.util.ObjectPool;
import udt.util.UDTThreadFactory;
/**
@@ -58,14 +72,116 @@
*/
public class UDPEndPoint {
+ //class fields
private static final Logger logger=Logger.getLogger(ClientSession.class.getName());
+ public static final int DATAGRAM_SIZE=1400;
+
+ //class methods
+ private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints
+ = new WeakHashMap<SocketAddress, UDPEndPoint>();
+
+ public static UDPEndPoint get(DatagramSocket socket){
+ SocketAddress localInetSocketAddress = null;
+ UDPEndPoint result = null;
+ if ( socket.isBound()){
+ SocketAddress sa = socket.getLocalSocketAddress();
+ if ( sa instanceof InetSocketAddress ){
+ localInetSocketAddress = (InetSocketAddress) sa;
+ } else {
+ // Must be a special DatagramSocket impl or extended.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPEndPoint(socket);
+ if (localInetSocketAddress == null){
+ // The DatagramSocket was unbound, it should be bound now.
+ localInetSocketAddress =
+ new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null && exists.getSocket().equals(socket)) result = exists;
+ // Only cache if a record doesn't already exist.
+ else if (exists == null) localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+
+
+ public static UDPEndPoint get(InetAddress localAddress, int localPort){
+ InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort);
+ return get(localInetSocketAddress);
+ }
+
+ public static UDPEndPoint get(SocketAddress localSocketAddress){
+ InetSocketAddress localInetSocketAddress = null;
+ if (localSocketAddress instanceof InetSocketAddress){
+ localInetSocketAddress = (InetSocketAddress) localSocketAddress;
+ } else if (localSocketAddress instanceof UDTSocketAddress){
+ UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress;
+ localInetSocketAddress =
+ new InetSocketAddress(udtSA.getAddress(), udtSA.getPort());
+ }
+ if (localInetSocketAddress == null) return null;
+ UDPEndPoint result = null;
+ synchronized (localEndpoints){
+ result = localEndpoints.get(localInetSocketAddress);
+ }
+ if (result != null) return result;
+ try {
+ result = new UDPEndPoint(localInetSocketAddress);
+ if (localInetSocketAddress.getPort() == 0 ||
+ localInetSocketAddress.getAddress().isAnyLocalAddress()){
+ // ephemeral port or wildcard address, bind operation is complete.
+ localInetSocketAddress =
+ new InetSocketAddress(result.getLocalAddress(), result.getLocalPort());
+ }
+ } catch (SocketException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ } catch (UnknownHostException ex) {
+ logger.log(Level.SEVERE, null, ex);
+ }
+ if (result != null){
+ synchronized (localEndpoints){
+ UDPEndPoint exists = localEndpoints.get(localInetSocketAddress);
+ if (exists != null) result = exists;
+ else localEndpoints.put(localInetSocketAddress, result);
+ }
+ }
+ return result; // may be null.
+ }
+
+ /**
+ * Allows a custom endpoint to be added to the pool.
+ * @param endpoint
+ */
+ public static void put(UDPEndPoint endpoint){
+ SocketAddress local = endpoint.getSocket().getLocalSocketAddress();
+ synchronized (localEndpoints){
+ localEndpoints.put(local, endpoint);
+ }
+ }
+
+ //object fields
private final int port;
private final DatagramSocket dgSocket;
//active sessions keyed by socket ID
- private final Map<Long,UDTSession>sessions=new ConcurrentHashMap<Long, UDTSession>();
+ private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>();
//last received packet
private UDTPacket lastPacket;
@@ -74,20 +190,39 @@
//this queue is used to handoff new UDTSessions to the application
private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>();
+ private final ObjectPool<BlockingQueue<UDTSession>> queuePool
+ = new ObjectPool<BlockingQueue<UDTSession>>(20);
+
+ private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff
+ = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120);
+
+ // registered sockets
+ private final Set<Integer> registeredSockets = new HashSet<Integer>(120);
+ // registered sockets lock
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ private final Lock readSocketLock = rwl.readLock();
+ private final Lock writeSocketLock = rwl.writeLock();
+
+
private boolean serverSocketMode=false;
//has the endpoint been stopped?
private volatile boolean stopped=false;
- public static final int DATAGRAM_SIZE=1400;
+ private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
+
/**
* create an endpoint on the given socket
*
* @param socket - a UDP datagram socket
+ * @throws SocketException
*/
- public UDPEndPoint(DatagramSocket socket){
+ protected UDPEndPoint(DatagramSocket socket) throws SocketException{
this.dgSocket=socket;
+ if (!socket.isBound()){
+ socket.bind(null);
+ }
port=dgSocket.getLocalPort();
}
@@ -97,7 +232,7 @@
* @throws SocketException
* @throws UnknownHostException
*/
- public UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{
+ private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{
this(localAddress,0);
}
@@ -108,7 +243,7 @@
* @throws SocketException
* @throws UnknownHostException
*/
- public UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
+ private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{
if(localAddress==null){
dgSocket=new DatagramSocket(localPort, localAddress);
}else{
@@ -120,6 +255,13 @@
configureSocket();
}
+ private UDPEndPoint (InetSocketAddress localSocketAddress)
+ throws SocketException, UnknownHostException {
+ dgSocket = new DatagramSocket(localSocketAddress);
+ port = dgSocket.getLocalPort();
+ configureSocket();
+ }
+
protected void configureSocket()throws SocketException{
//set a time out to avoid blocking in doReceive()
dgSocket.setSoTimeout(100000);
@@ -184,6 +326,43 @@
}
/**
+ * Provides assistance to a socket to determine a random socket id,
+ * every caller receives a unique value. This value is unique at the
+ * time of calling, however it may not be at registration time.
+ *
+ * This socketID has not been registered, all socket ID's must be
+ * registered or connection will fail.
+ * @return
+ */
+ public int getUniqueSocketID(){
+ Integer socketID = nextSocketID.getAndIncrement();
+ try{
+ readSocketLock.lock();
+ while (registeredSockets.contains(socketID)){
+ socketID = nextSocketID.getAndIncrement();
+ }
+ return socketID; // should we register it?
+ } finally {
+ readSocketLock.unlock();
+ }
+ }
+
+ void registerSocketID(int socketID, UDTSocket socket) throws SocketException {
+ if (!equals(socket.getEndpoint())) throw new SocketException (
+ "Socket doesn't originate for this endpoint: "
+ + socket.toString());
+ try {
+ writeSocketLock.lock();
+ if (registeredSockets.contains(socketID)){
+ throw new SocketException("Already registered, Socket ID: " +socketID);
+ }
+ registeredSockets.add(socketID);
+ }finally{
+ writeSocketLock.unlock();
+ }
+ }
+
+ /**
* @return the port which this client is bound to
*/
public int getLocalPort() {
@@ -204,8 +383,8 @@
return lastPacket;
}
- public void addSession(Long destinationID,UDTSession session){
- logger.info("Storing session <"+destinationID+">");
+ public void addSession(Integer destinationID,UDTSession session){
+ logger.log(Level.INFO, "Storing session <{0}>", destinationID);
sessions.put(destinationID, session);
}
@@ -221,12 +400,33 @@
* wait the given time for a new connection
* @param timeout - the time to wait
* @param unit - the {@link TimeUnit}
+ * @param socketID - the socket id.
* @return a new {@link UDTSession}
* @throws InterruptedException
*/
- protected UDTSession accept(long timeout, TimeUnit unit)throws InterruptedException{
- return sessionHandoff.poll(timeout, unit);
+ protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{
+ //return sessionHandoff.poll(timeout, unit);
+ BlockingQueue<UDTSession> session = handoff.get(socketID);
+ try {
+ if (session == null){
+ session = queuePool.get();
+ if (session == null) {
+ session = new ArrayBlockingQueue<UDTSession>(1);
}
+ BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session);
+ if (existed != null){
+ session = existed;
+ }
+ }
+ return session.poll(timeout, unit);
+ } finally {
+ boolean removed = handoff.remove(socketID, session);
+ if (removed){
+ session.clear();
+ queuePool.accept(session);
+ }
+ }
+ }
final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE);
@@ -250,32 +450,42 @@
protected void doReceive()throws IOException{
while(!stopped){
try{
- try{
-
//will block until a packet is received or timeout has expired
dgSocket.receive(dp);
-
- Destination peer=new Destination(dp.getAddress(), dp.getPort());
+ UDTSocketAddress peer= null;
int l=dp.getLength();
UDTPacket packet=PacketFactory.createPacket(dp.getData(),l);
lastPacket=packet;
-
//handle connection handshake
if(packet.isConnectionHandshake()){
synchronized(lock){
Long id=Long.valueOf(packet.getDestinationID());
UDTSession session=sessions.get(id);
- if(session==null){
+ if(session==null){ // What about DOS?
session=new ServerSession(dp,this);
addSession(session.getSocketID(),session);
//TODO need to check peer to avoid duplicate server session
if(serverSocketMode){
logger.fine("Pooling new request.");
- sessionHandoff.put(session);
+// sessionHandoff.put(session); // blocking method, what about offer?
+ BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID());
+ if (queue != null){
+ boolean success = queue.offer(session);
+ if (success){
logger.fine("Request taken for processing.");
+ } else {
+ logger.fine("Request discarded, queue full.");
}
+ } else {
+ logger.fine("No ServerSocket listening at socketID: "
+ + session.getSocketID()
+ + "to answer request");
}
+ }
+ }
peer.setSocketID(((ConnectionHandshake)packet).getSocketID());
+ peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(),
+ ((ConnectionHandshake)packet).getSocketID());
session.received(packet,peer);
}
}
@@ -294,7 +504,9 @@
if(session==null){
n++;
if(n%100==1){
- logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName());
+ logger.warning("Unknown session <"+dest
+ +"> requested from <"+peer+"> packet type "
+ +packet.getClass().getName());
}
}
else{
@@ -305,8 +517,6 @@
logger.log(Level.INFO, "SocketException: "+ex.getMessage());
}catch(SocketTimeoutException ste){
//can safely ignore... we will retry until the endpoint is stopped
- }
-
}catch(Exception ex){
logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex);
}
Modified: udt-java/skunk/src/main/java/udt/UDTClient.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -33,13 +33,15 @@
package udt;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.logging.Level;
import java.util.logging.Logger;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
import udt.packets.Shutdown;
import udt.util.UDTStatistics;
@@ -52,13 +54,13 @@
public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint=new UDPEndPoint(address,localport);
+ clientEndpoint= UDPEndPoint.get(address,localport);
logger.info("Created client endpoint on port "+localport);
}
public UDTClient(InetAddress address)throws SocketException, UnknownHostException{
//create endpoint
- clientEndpoint=new UDPEndPoint(address);
+ clientEndpoint= UDPEndPoint.get(address, 0);
logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort());
}
@@ -75,7 +77,7 @@
*/
public void connect(String host, int port)throws InterruptedException, UnknownHostException, IOException{
InetAddress address=InetAddress.getByName(host);
- Destination destination=new Destination(address,port);
+ UDTSocketAddress destination= new UDTSocketAddress(address,port,0);
//create client session...
clientSession=new ClientSession(clientEndpoint,destination);
clientEndpoint.addSession(clientSession.getSocketID(), clientSession);
@@ -101,7 +103,7 @@
}
/**
- * sends the given data and waits for acknowledgement
+ * sends the given data and waits for acknowledgment
* @param data - the data to send
* @throws IOException
* @throws InterruptedException if interrupted while waiting for ack
@@ -143,11 +145,11 @@
}
}
- public UDTInputStream getInputStream()throws IOException{
+ public InputStream getInputStream()throws IOException{
return clientSession.getSocket().getInputStream();
}
- public UDTOutputStream getOutputStream()throws IOException{
+ public OutputStream getOutputStream()throws IOException{
return clientSession.getSocket().getOutputStream();
}
Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -65,7 +65,7 @@
public UDTCongestionControl(UDTSession session){
this.session=session;
this.statistics=session.getStatistics();
- lastDecreaseSeqNo=session.getInitialSequenceNumber()-1;
+ lastDecreaseSeqNo=session.getCurrentSequenceNumber()-1;
}
/* (non-Javadoc)
Modified: udt-java/skunk/src/main/java/udt/UDTInputStream.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -66,10 +66,10 @@
* @param socket - the {@link UDTSocket}
* @throws IOException
*/
- public UDTInputStream(UDTSocket socket)throws IOException{
+ UDTInputStream(UDTSocket socket)throws IOException{
this.socket=socket;
int capacity=socket!=null? 2 * socket.getSession().getFlowWindowSize() : 128 ;
- long initialSequenceNum=socket!=null?socket.getSession().getInitialSequenceNumber():1;
+ int initialSequenceNum=socket!=null?socket.getSession().getCurrentSequenceNumber():1;
receiveBuffer=new ReceiveBuffer(capacity,initialSequenceNum);
}
Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -169,7 +169,7 @@
packetHistoryWindow = new PacketHistoryWindow(16);
receiverLossList = new ReceiverLossList();
packetPairWindow = new PacketPairWindow(16);
- largestReceivedSeqNumber=session.getInitialSequenceNumber()-1;
+ largestReceivedSeqNumber=session.getCurrentSequenceNumber()-1;
bufferSize=session.getReceiveBufferSize();
handoffQueue=new ArrayBlockingQueue<UDTPacket>(4*session.getFlowWindowSize());
storeStatistics=Boolean.getBoolean("udt.receiver.storeStatistics");
@@ -396,7 +396,7 @@
// return;
// }
// //}
- boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData());
+ boolean OK= ((UDTInputStream) session.getSocket().getInputStream()).haveNewData(currentSequenceNumber,dp.getData());
if(!OK){
//need to drop packet...
return;
Modified: udt-java/skunk/src/main/java/udt/UDTSender.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -68,9 +68,7 @@
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
private final UDPEndPoint endpoint;
-
private final UDTSession session;
-
private final UDTStatistics statistics;
//senderLossList stores the sequence numbers of lost packets
@@ -92,31 +90,31 @@
private final AtomicInteger unacknowledged=new AtomicInteger(0);
//for generating data packet sequence numbers
- private volatile long currentSequenceNumber=0;
+ //volatile long counters are not atomic.
+ //private volatile int currentSequenceNumber=0;
//the largest data packet sequence number that has actually been sent out
- private volatile long largestSentSequenceNumber=-1;
+ private volatile int largestSentSequenceNumber=-1;
//last acknowledge number, initialised to the initial sequence number
- private volatile long lastAckSequenceNumber;
+ private volatile int lastAckSequenceNumber;
private volatile boolean started=false;
-
private volatile boolean stopped=false;
-
private volatile boolean paused=false;
//used to signal that the sender should start to send
private volatile CountDownLatch startLatch=new CountDownLatch(1);
//used by the sender to wait for an ACK
- private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>();
+ private final AtomicReference<CountDownLatch> waitForAckLatch
+ =new AtomicReference<CountDownLatch>();
//used by the sender to wait for an ACK of a certain sequence number
- private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>();
+ private final AtomicReference<CountDownLatch> waitForSeqAckLatch
+ =new AtomicReference<CountDownLatch>();
private final boolean storeStatistics;
-
private final int chunksize;
public UDTSender(UDTSession session,UDPEndPoint endpoint){
@@ -128,8 +126,8 @@
sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2);
chunksize=session.getDatagramSize()-24;//need space for the header;
flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize);
- lastAckSequenceNumber=session.getInitialSequenceNumber();
- currentSequenceNumber=session.getInitialSequenceNumber()-1;
+ lastAckSequenceNumber=session.getCurrentSequenceNumber();
+// currentSequenceNumber=session.getCurrentSequenceNumber()-1;
waitForAckLatch.set(new CountDownLatch(1));
waitForSeqAckLatch.set(new CountDownLatch(1));
storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics");
@@ -212,6 +210,7 @@
protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{
if(!started)start();
DataPacket packet=null;
+
do{
packet=flowWindow.getForProducer();
if(packet==null){
@@ -219,7 +218,7 @@
}
}while(packet==null);//TODO check timeout...
try{
- packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setPacketSequenceNumber(incrementAndGetSequenceNo());
packet.setSession(session);
packet.setDestinationID(session.getDestination().getSocketID());
int len=Math.min(bb.remaining(),chunksize);
@@ -253,7 +252,7 @@
}
}while(packet==null);
try{
- packet.setPacketSequenceNumber(getNextSequenceNumber());
+ packet.setPacketSequenceNumber(incrementAndGetSequenceNo());
packet.setSession(session);
packet.setDestinationID(session.getDestination().getSocketID());
packet.setData(data);
@@ -309,7 +308,7 @@
unacknowledged.decrementAndGet();
}
}
- lastAckSequenceNumber=Math.max(lastAckSequenceNumber, ackNumber);
+ lastAckSequenceNumber=(int) Math.max(lastAckSequenceNumber, ackNumber);
//send ACK2 packet to the receiver
sendAck2(ackNumber);
statistics.incNumberOfACKReceived();
@@ -362,22 +361,20 @@
Long entry=senderLossList.getFirstEntry();
if(entry!=null){
handleRetransmit(entry);
- }
- else
- {
+ }else{
//if the number of unacknowledged data packets does not exceed the congestion
//and the flow window sizes, pack a new packet
int unAcknowledged=unacknowledged.get();
-
- if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize()
- && unAcknowledged<session.getFlowWindowSize()){
+ if(unAcknowledged<session.getCongestionControl()
+ .getCongestionWindowSize()
+ && unAcknowledged<session.getFlowWindowSize())
+ {
//check for application data
DataPacket dp=flowWindow.consumeData();
if(dp!=null){
send(dp);
- largestSentSequenceNumber=dp.getPacketSequenceNumber();
- }
- else{
+ largestSentSequenceNumber=(int) dp.getPacketSequenceNumber();
+ }else{
statistics.incNumberOfMissingDataEvents();
}
}else{
@@ -388,7 +385,6 @@
waitForAck();
}
}
-
//wait
if(largestSentSequenceNumber % 16 !=0){
long snd=(long)session.getCongestionControl().getSendInterval();
@@ -445,13 +441,12 @@
* the next sequence number for data packets.
* The initial sequence number is "0"
*/
- public long getNextSequenceNumber(){
- currentSequenceNumber=SequenceNumber.increment(currentSequenceNumber);
- return currentSequenceNumber;
+ public int incrementAndGetSequenceNo(){
+ return session.incrementAndGetSequenceNo();
}
- public long getCurrentSequenceNumber(){
- return currentSequenceNumber;
+ public int getCurrentSequenceNumber(){
+ return session.getCurrentSequenceNumber();
}
/**
Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -31,35 +31,48 @@
*********************************************************************************/
package udt;
+import java.io.IOException;
import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
import java.util.logging.Logger;
-public class UDTServerSocket {
+public class UDTServerSocket extends ServerSocket {
private static final Logger logger=Logger.getLogger(UDTClient.class.getName());
- private final UDPEndPoint endpoint;
+ private volatile UDPEndPoint endpoint;
+ private volatile InetAddress localAdd;
+ private volatile int locPort;
+ private volatile SocketAddress localSocketAddress;
- private boolean started=false;
-
+ private volatile boolean started=false;
+ private volatile boolean bound = false;
private volatile boolean shutdown=false;
/**
* create a UDT ServerSocket
- * @param localAddress
+ * @param localSocketAddress
* @param port - the local port. If 0, an ephemeral port will be chosen
*/
- public UDTServerSocket(InetAddress localAddress, int port)throws SocketException,UnknownHostException{
- endpoint=new UDPEndPoint(localAddress,port);
+ public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{
+ super();
+ endpoint= UDPEndPoint.get(localAddress,port);
+ localAdd = localAddress;
+ locPort = port;
+ bound = true;
logger.info("Created server endpoint on port "+endpoint.getLocalPort());
}
//starts a server on localhost
- public UDTServerSocket(int port)throws SocketException,UnknownHostException{
+ public UDTServerSocket(int port)throws IOException,UnknownHostException{
this(InetAddress.getLocalHost(),port);
}
@@ -68,13 +81,16 @@
* for the new connection
* @return
*/
- public synchronized UDTSocket accept()throws InterruptedException{
+@Override
+ public synchronized Socket accept() throws IOException{
if(!started){
endpoint.start(true);
started=true;
}
+ // TODO: use a blocking queue.
while(!shutdown){
- UDTSession session=endpoint.accept(10000, TimeUnit.MILLISECONDS);
+ try {
+ UDTSession session = endpoint.accept(10000, TimeUnit.MILLISECONDS, null);
if(session!=null){
//wait for handshake to complete
while(!session.isReady() || session.getSocket()==null){
@@ -82,16 +98,107 @@
}
return session.getSocket();
}
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
}
- throw new InterruptedException();
}
+ throw new IOException("UDTSession was null");
+ }
- public void shutDown(){
+ public UDPEndPoint getEndpoint(){
+ return endpoint;
+ }
+
+ @Override
+ public void bind(SocketAddress endpoint){
+ //TODO: Implement ServerSocket.bind
+ }
+
+ @Override
+ public void bind(SocketAddress endpoint, int timeout){
+ //TODO: Implement ServerSocket.bind
+ }
+
+ @Override
+ public void close(){
shutdown=true;
+ // TODO: The endpoint might have other ServerSocket's listening,
+ // we need to pass the endpoint the socket, or session or something
+ // the endpoint should only stop when it has no remaining sessions.
endpoint.stop();
}
- public UDPEndPoint getEndpoint(){
- return endpoint;
+ @Override
+ public ServerSocketChannel getChannel(){
+ return null;
}
+
+ @Override
+ public InetAddress getInetAddress(){
+ return localAdd;
}
+
+ @Override
+ public int getLocalPort(){
+ return locPort;
+ }
+
+ @Override
+ public SocketAddress getLocalSocketAddress(){
+ return localSocketAddress;
+ }
+
+ @Override
+ public int getReceiveBufferSize(){
+ return 0;
+ }
+
+ @Override
+ public boolean getReuseAddress(){
+ return false;
+ }
+
+ @Override
+ public int getSoTimeout(){
+ return 0;
+ }
+
+ @Override
+ public boolean isBound(){
+ return started;
+ }
+
+ @Override
+ public boolean isClosed(){
+ return shutdown;
+ }
+
+ @Override
+ public void setPerformancePreferences(int connectionTime, int latency, int bandwidth){
+
+ }
+
+ @Override
+ public void setReceiveBufferSize(int size){
+
+ }
+
+ @Override
+ public void setReuseAddress(boolean on){
+
+ }
+
+ @Override
+ public void setSoTimeout(int timeout){
+
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(120);
+ sb.append("UDTServerSocket: \n");
+ //TODO: add statistics.
+ return sb.toString();
+ }
+
+}
Modified: udt-java/skunk/src/main/java/udt/UDTSession.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -33,12 +33,16 @@
package udt;
import java.net.DatagramPacket;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import udt.packets.Destination;
+import udt.packets.UDTSocketAddress;
+import udt.util.SequenceNumber;
import udt.util.UDTStatistics;
public abstract class UDTSession {
@@ -79,7 +83,7 @@
/**
* remote UDT entity (address and socket ID)
*/
- protected final Destination destination;
+ protected final UDTSocketAddress destination;
/**
* local port
@@ -101,16 +105,19 @@
*/
protected int datagramSize=DEFAULT_DATAGRAM_SIZE;
- protected Long initialSequenceNumber=null;
+ protected int initialSequenceNumber=11;
- protected final long mySocketID;
+ private final AtomicInteger sequenceNo = new AtomicInteger(SequenceNumber.random());
- private final static AtomicLong nextSocketID=new AtomicLong(20+new Random().nextInt(5000));
+ protected final int mySocketID;
- public UDTSession(String description, Destination destination){
+ private final static AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000));
+
+ public UDTSession(String description, UDTSocketAddress dest){
+ InetSocketAddress inetAdd = null;
statistics=new UDTStatistics(description);
mySocketID=nextSocketID.incrementAndGet();
- this.destination=destination;
+ this.destination= dest;
this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort());
String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName());
Object ccObject=null;
@@ -125,10 +132,8 @@
logger.info("Using "+cc.getClass().getName());
}
+ public abstract void received(UDTPacket packet, UDTSocketAddress peer);
- public abstract void received(UDTPacket packet, Destination peer);
-
-
public UDTSocket getSocket() {
return socket;
}
@@ -137,7 +142,7 @@
return cc;
}
- public int getState() {
+ protected int getState() {
return state;
}
@@ -149,7 +154,7 @@
this.socket = socket;
}
- public void setState(int state) {
+ protected void setState(int state) {
logger.info(toString()+" connection state CHANGED to <"+state+">");
this.state = state;
}
@@ -170,7 +175,7 @@
return state==shutdown || state==invalid;
}
- public Destination getDestination() {
+ public UDTSocketAddress getDestination() {
return destination;
}
@@ -202,21 +207,30 @@
return statistics;
}
- public long getSocketID(){
+ public int getSocketID(){
return mySocketID;
}
- public synchronized long getInitialSequenceNumber(){
- if(initialSequenceNumber==null){
- initialSequenceNumber=1l; //TODO must be random?
+ public int getCurrentSequenceNumber(){
+ return sequenceNo.get();
}
- return initialSequenceNumber;
+
+ public int incrementAndGetSequenceNo(){
+ while (true){
+ int sequence = sequenceNo.get();
+ int increment = SequenceNumber.increment(sequence);
+ boolean success = false;
+ success = sequenceNo.compareAndSet(sequence, increment);
+ if (success) return increment;
}
+ }
- public synchronized void setInitialSequenceNumber(long initialSequenceNumber){
- this.initialSequenceNumber=initialSequenceNumber;
+ protected void setInitialSequenceNumber(int initialSequenceNumber){
+ if (state == handshaking){
+ sequenceNo.set(initialSequenceNumber);
}
+ }
public DatagramPacket getDatagram(){
return dgPacket;
Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java
===================================================================
--- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 01:14:28 UTC (rev 59)
+++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60)
@@ -32,10 +32,23 @@
package udt;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
+import java.net.SocketOptions;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import udt.packets.UDTSocketAddress;
/**
* UDTSocket is analogous to a normal java.net.Socket, it provides input and
* output streams for the application
@@ -43,34 +56,443 @@
* TODO is it possible to actually extend java.net.Socket ?
*
*/
-public class UDTSocket {
+public class UDTSocket extends Socket{
+ private static final ArrayList<UDTSocketAddress> boundSockets
+ = new ArrayList<UDTSocketAddress>(120);
+
//endpoint
- private final UDPEndPoint endpoint;
+ private volatile UDPEndPoint endpoint;
private volatile boolean active;
+ private volatile boolean connected;
+ private volatile boolean bound;
+ private volatile boolean shutIn; // receiver closed.
+ private volatile boolean shutOut; // sender closed.
+ private volatile boolean closed;
//processing received data
- private UDTReceiver receiver;
- private UDTSender sender;
+ private volatile UDTReceiver receiver;
+ private volatile UDTSender sender;
- private final UDTSession session;
+ private volatile UDTSession session;
- private UDTInputStream inputStream;
- private UDTOutputStream outputStream;
+ private volatile UDTInputStream inputStream;
+ private volatile UDTOutputStream outputStream;
+
+ private volatile UDTSocketAddress localSocketAddress;
+ private volatile UDTSocketAddress destination;
/**
+ * The session is usually the caller for this constructor
+ * so the session already knows this is the socket.
* @param host
* @param port
* @param endpoint
* @throws SocketException,UnknownHostException
*/
- public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
+ UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{
+ super();
this.endpoint=endpoint;
this.session=session;
this.receiver=new UDTReceiver(session,endpoint);
this.sender=new UDTSender(session,endpoint);
+ localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(),
+ endpoint.getLocalPort(), session.getSocketID());
+ destination = session.getDestination();
+ bound = true;
}
+ public UDTSocket(InetAddress host, int port ) throws SocketException,
+ UnknownHostException{
+ super();
+ this.endpoint = UDPEndPoint.get(host, port);
+ this.session = null;
+ this.receiver = null;
+ this.sender = null;
+ active = false;
+ bound = true;
+
+ }
+
+ public UDTSocket(){
+ super();
+ endpoint = null;
+ session = null;
+ receiver = null;
+ sender = null;
+ active = false;
+ bound = false;
+ }
+
+ @Override
+ public void connect(SocketAddress destination) throws IOException {
+ connect(destination, 0);
+ }
+
+ @Override
+ public void connect(SocketAddress destination, int timeout) throws IOException {
+ if (destination == null) throw new IllegalArgumentException("connect: The address can't be null");
+ if (timeout < 0) throw new IllegalArgumentException("connect: timeout can't be negative");
+ if (isClosed()) throw new SocketException("Socket is closed");
+ if (isConnected()) throw new SocketException("already connected");
+ if (!(destination instanceof UDTSocketAddress))
+ throw new IllegalArgumentException("Unsupported address type");
+
+ UDTSocketAddress epoint = (UDTSocketAddress) destination;
+ InetAddress addr = epoint.getAddress();
+ int port = epoint.getPort();
+
+ SecurityManager security = System.getSecurityManager();
+ if (security != null) {
+ security.checkConnect(addr.getHostAddress(),port);
+ }
+ if (session == null){
+ session = new ClientSession(endpoint, (UDTSocketAddress) destination);
+ session.setSocket(this);
+ }
+ endpoint.addSession(session.getSocketID(), session);
+ receiver = new UDTReceiver(session, endpoint);
+ sender = new UDTSender(session, endpoint);
+ localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(),
+ endpoint.getLocalPort(), session.getSocketID());
+ endpoint.start();
+ try {
+ ((ClientSession)session).connect();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ destination = session.getDestination();
+ connected = true;
+ /*
+ * If the socket was not bound before the connect, it is now because
+ * the kernel will have picked an ephemeral port & a local address
+ */
+ bound = true;
+ }
+
+ /**
+ * Binds the socket to a local address.
+ * <P>
+ * If the address is <code>null</code>, then the system will pick up
+ * an ephemeral port and a valid local address to bind the socket.
+ *
+ * @param bindpoint the <code>SocketAddress</code> to bind to
+ * @throws IOException if the bind operation fails, or if the socket
+ * is already bound.
+ * @throws IllegalArgumentException if bindpoint is a
+ * SocketAddress subclass not supported by this socket
+ *
+ * @since 1.4
+ * @see #isBound
+ */
+ public void bind(SocketAddress bindpoint) throws IOException {
+ if (!(bindpoint instanceof UDTSocketAddress))
+ throw new IllegalArgumentException("Unsupported SocketAddress type");
+ if (isBound()) throw new IOException("Socket already bound");
+ synchronized (boundSockets) {
+ if (boundSockets.contains(bindpoint)) throw
+ new IOException("A socket is already bound to this address");
+ }
+ endpoint = UDPEndPoint.get(bindpoint);
+ if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint");
+ bound = true;
+ }
+
+ @Override
+ public InetAddress getInetAddress() {
+ if (!isConnected()) return null;
+ return destination.getAddress();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public InetAddress getLocalAddress() {
+ // This is for backward compatibility only, the super is not bound and
+ // returns InetAddress.anyLocalAddress();
+ if (!isBound()) super.getLocalAddress();
+ return localSocketAddress.getAddress();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public int getPort() {
+ if (!isConnected()) return 0;
+ return destination.getPort();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public int getLocalPort() {
+ if (!isBound()) return -1;
+ return localSocketAddress.getPort();
+ }
+
+ // Inherit javadoc.
+ @Override
+ public SocketAddress getRemoteSocketAddress() {
+ if (!isConnected()) return null;
+ return destination;
+ }
+
+ // Inherit javadoc.
+ @Override
+ public SocketAddress getLocalSocketAddress() {
+ if (!isBound()) return null;
+ return localSocketAddress;
+ }
+
+ @Override
+ public SocketChannel getChannel() {
+ return null;
+ }
+
+ /**
+ * Not supported
+ * @param on
+ * @throws SocketException
+ */
+ @Override
+ public void setTcpNoDelay(boolean on) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
+ }
+
+ /**
+ * Not supported
+ * @return false
+ * @throws SocketException
+ */
+ @Override
+ public boolean getTcpNoDelay() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ return ((Boolean) getOption(SocketOptions.TCP_NODELAY)).booleanValue();
+ }
+
+ /**
+ * Not supported.
+ * @param on
+ * @param linger
+ * @throws SocketException
+ */
+ @Override
+ public void setSoLinger(boolean on, int linger) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (linger < 0) throw new IllegalArgumentException("SO_LINGER cannot be less than zero");
+ // do nothing, not supported.
+ }
+
+ @Override
+ public int getSoLinger() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ return -1; // implies this option is disabled
+ }
+
+ // Sending of urgent data, should we support it?
+ @Override
+ public void sendUrgentData (int data) throws IOException {
+ throw new SocketException ("Urgent data not supported");
+ }
+
+ // Sending of urgent data, should we enable it?
+ @Override
+ public void setOOBInline(boolean on) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.SO_OOBINLINE, Boolean.valueOf(on));
+ }
+
+ @Override
+ public boolean getOOBInline() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ return ((Boolean) getOption(SocketOptions.SO_OOBINLINE)).booleanValue();
+ }
+
+ // TODO: implement set socket timeout.
+ @Override
+ public void setSoTimeout(int timeout) throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (timeout < 0) throw new IllegalArgumentException("negative timeout not allowed");
+ setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout));
+ }
+
+ // TODO: Implement get socket timeout.
+ @Override
+ public synchronized int getSoTimeout() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket closed");
+ Object o = getOption(SocketOptions.SO_TIMEOUT);
+ if (o instanceof Integer) return ((Integer) o).intValue();
+ return 0;
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public void setSendBufferSize(int size)
+ throws SocketException{
+ if (!(size > 0)) throw new IllegalArgumentException("negative send size not allowed");
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.SO_SNDBUF, new Integer(size));
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public int getSendBufferSize() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket is closed");
+ int result = 0;
+ Object o = getOption(SocketOptions.SO_SNDBUF);
+ if (o instanceof Integer) result = ((Integer)o).intValue();
+ return result;
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public void setReceiveBufferSize(int size)
+ throws SocketException{
+ if (size <= 0) throw new IllegalArgumentException("invalid receive size");
+ if (isClosed()) throw new SocketException("Socket closed");
+ setOption(SocketOptions.SO_RCVBUF, new Integer(size));
+ }
+
+ // object method signature compatibility only, not currently supported.
+ @Override
+ public int getReceiveBufferSize()
+ throws SocketException{
+ if (isClosed()) throw new SocketException("Socket closed");
+ int result = 0;
+ Object o = getOption(SocketOptions.SO_RCVBUF);
+ if (o instanceof Integer) {
+ result = ((Integer)o).intValue();
+ }
+ return result;
+ }
+
+ // TODO: Implement keep alive.
+ @Override
+ public void setKeepAlive(boolean on) throws SocketException {
+ if (isClosed())
+ throw new SocketException("Socket is closed");
+ setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on));
+ }
+
+
+ @Override
+ public boolean getKeepAlive() throws SocketException {
+ // Keep alive is not currently implemented in the udt session.
+ if (isClosed())
+ throw new SocketException("Socket is closed");
+ return ((Boolean) getOption(SocketOptions.SO_KEEPALIVE)).booleanValue();
+ }
+
+ public void setTrafficClass(int tc) throws SocketException {
+ // Safe to ignore, not supported.
+ }
+
+ public int getTrafficClass() throws SocketException {
+ // Call redirected to underlying DatagramSocket.
+ return endpoint.getSocket().getTrafficClass();
+ }
+
+ @Override
+ public void setReuseAddress(boolean on) throws SocketException {
+ throw new SocketException("SO_REUSEADDR not supported");
+ }
+
+ /**
+ * Tests if SO_REUSEADDR is enabled.
+ *
+ * @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is enabled.
+ * @exception SocketException if there is an error
+ * in the underlying protocol, such as a TCP error.
+ * @since 1.4
+ * @see #setReuseAddress(boolean)
+ */
+ @Override
+ public boolean getReuseAddress() throws SocketException {
+ if (isClosed()) throw new SocketException("Socket is closed");
+ return ((Boolean) (getOption(SocketOptions.SO_REUSEADDR))).booleanValue();
+ }
+
+ /**
+ * Places the input stream for this socket at "end of stream".
+ * Any data sent to the input stream side of the socket is acknowledged
+ * and then silently discarded.
+ * <p>
+ * If you read from a socket input stream after invoking
+ * shutdownInput() on the socket, the stream will return EOF.
+ *
+ * @exception IOException if an I/O error occurs when shutting down this
+ * socket.
+ *
+ * @since 1.3
+ * @see java.net.Socket#shutdownOutput()
+ * @see java.net.Socket#close()
+ * @see java.net.Socket#setSoLinger(boolean, int)
+ * @see #isInputShutdown
+ */
+ @Override
+ public void shutdownInput() throws IOException
+ {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (!isConnected()) throw new SocketException("Socket not connected");
+ if (isInputShutdown()) throw new SocketException("Socket input already shutdown");
+ receiver.stop();
+ inputStream.close();
+ shutIn = true;
+ }
+
+ @Override
+ public void shutdownOutput() throws IOException
+ {
+ if (isClosed()) throw new SocketException("Socket closed");
+ if (!isConnected()) throw new SocketException("Socket is not connected");
+ if (isOutputShutdown()) throw new SocketException("Socket output is already shutdown");
+ sender.stop();
+ outputStream.close();
+ shutOut = true;
+ }
+
+ /**
+ * Returns the connection state of the socket.
+ *
+ * @return true if the socket successfuly connected to a server
+ * @since 1.4
+ */
+ @Override
+ public boolean isConnected() {
+ // Before 1.3 Sockets were always connected during creation
+ return connected;
+ }
+
+ @Override
+ public boolean isBound() {
+ return bound ;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public boolean isInputShutdown() {
+ return shutIn;
+ }
+
+ /**
+ * Returns whether the write-half of the socket connection is closed.
+ *
+ * @return true if the output of the socket has been shutdown
+ * @since 1.4
+ * @see #shutdownOutput
+ */
+ @Override
+ public boolean isOutputShutdown() {
+ return shutOut;
+ }
+
+ // some preliminary support for socket options.
+ private Object getOption(int optID) {
+ return Boolean.FALSE;
+ }
+
public UDTReceiver getReceiver() {
return receiver;
}
@@ -87,9 +509,9 @@
this.sender = sender;
}
- public void setActive(boolean active) {
- this.active = active;
- }
+// public void setActive(boolean active) {
+// this.active = active;
+// }
public boolean isActive() {
return active;
@@ -103,7 +525,8 @@
* get the input stream for reading from this socket
* @return
*/
- public synchronized UDTInputStream getInputStream()throws IOException{
+ @Override
+ public synchronized InputStream getInputStream()throws IOException{
if(inputStream==null){
inputStream=new UDTInputStream(this);
}
@@ -114,7 +537,8 @@
* get the output stream for writing to this socket
* @return
*/
- public synchronized UDTOutputStream getOutputStream(){
+ @Override
+ public synchronized OutputStream getOutputStream(){
if(outputStream==null){
outputStream=new UDTOutputStream(this);
}
@@ -126,7 +550,7 @@
}
/**
- * write single block of data without waiting for any acknowledgement
+ * write single block of data without waiting for any acknowledgment
* @param data
*/
protected void doWrite(byte[]data)throws IOException{
@@ -176,6 +600,8 @@
/**
* will block until the outstanding packets have really been sent out
* and acknowledged
+ *
+ * @throws InterruptedException
*/
protected void flush() throws InterruptedException{
if(!active)return;
@@ -200,14 +626,38 @@
flush();
}
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder(400);
+ sb .append("UDTSocket: \n")
+ .append("Local address: ")
+ ...
[truncated message content] |