[Udt-java-commits] SF.net SVN: udt-java:[60] udt-java/skunk
Status: Alpha
Brought to you by:
bschuller
From: <pe...@us...> - 2011-08-05 04:24:09
|
Revision: 60 http://udt-java.svn.sourceforge.net/udt-java/?rev=60&view=rev Author: pete_ Date: 2011-08-05 04:24:00 +0000 (Fri, 05 Aug 2011) Log Message: ----------- Experimental changes, partial implementation of java sockets and UDP multiplexing. Modified Paths: -------------- udt-java/skunk/src/main/java/udt/ClientSession.java udt-java/skunk/src/main/java/udt/ServerSession.java udt-java/skunk/src/main/java/udt/UDPEndPoint.java udt-java/skunk/src/main/java/udt/UDTClient.java udt-java/skunk/src/main/java/udt/UDTCongestionControl.java udt-java/skunk/src/main/java/udt/UDTInputStream.java udt-java/skunk/src/main/java/udt/UDTReceiver.java udt-java/skunk/src/main/java/udt/UDTSender.java udt-java/skunk/src/main/java/udt/UDTServerSocket.java udt-java/skunk/src/main/java/udt/UDTSession.java udt-java/skunk/src/main/java/udt/UDTSocket.java udt-java/skunk/src/main/java/udt/packets/ConnectionHandshake.java udt-java/skunk/src/main/java/udt/packets/DataPacket.java udt-java/skunk/src/main/java/udt/packets/PacketFactory.java udt-java/skunk/src/main/java/udt/packets/PacketUtil.java udt-java/skunk/src/main/java/udt/unicore/FufexSend.java udt-java/skunk/src/main/java/udt/util/ReceiveBuffer.java udt-java/skunk/src/main/java/udt/util/ReceiveFile.java udt-java/skunk/src/main/java/udt/util/SendFile.java udt-java/skunk/src/main/java/udt/util/SequenceNumber.java udt-java/skunk/src/test/java/echo/EchoServer.java udt-java/skunk/src/test/java/echo/TestEchoServer.java udt-java/skunk/src/test/java/echo/TestEchoServerMultiClient.java udt-java/skunk/src/test/java/udt/TestUDTServerSocket.java udt-java/skunk/src/test/java/udt/TestUdpEndpoint.java udt-java/skunk/src/test/java/udt/performance/TestUDTLargeData.java Added Paths: ----------- udt-java/skunk/src/main/java/udt/packets/UDTSocketAddress.java udt-java/skunk/src/main/java/udt/util/ObjectPool.java udt-java/skunk/src/main/java/udt/util/Recycler.java Removed Paths: ------------- udt-java/skunk/src/main/java/udt/packets/Destination.java Property Changed: ---------------- udt-java/skunk/ Property changes on: udt-java/skunk ___________________________________________________________________ Added: svn:ignore + target Modified: udt-java/skunk/src/main/java/udt/ClientSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/ClientSession.java 2011-08-05 04:24:00 UTC (rev 60) @@ -38,7 +38,7 @@ import java.util.logging.Logger; import udt.packets.ConnectionHandshake; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.Shutdown; import udt.util.SequenceNumber; @@ -52,7 +52,7 @@ private UDPEndPoint endPoint; - public ClientSession(UDPEndPoint endPoint, Destination dest)throws SocketException{ + public ClientSession(UDPEndPoint endPoint, UDTSocketAddress dest)throws SocketException{ super("ClientSession localPort="+endPoint.getLocalPort(),dest); this.endPoint=endPoint; logger.info("Created "+toString()); @@ -78,7 +78,7 @@ } @Override - public void received(UDTPacket packet, Destination peer) { + public void received(UDTPacket packet, UDTSocketAddress peer) { lastPacket=packet; @@ -91,7 +91,7 @@ if(hs.getConnectionType()==1){ try{ //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); + int peerSocketID=hs.getSocketID(); destination.setSocketID(peerSocketID); sendConfirmation(hs); }catch(Exception ex){ @@ -103,10 +103,12 @@ else{ try{ //TODO validate parameters sent by peer - long peerSocketID=hs.getSocketID(); + int peerSocketID=hs.getSocketID(); destination.setSocketID(peerSocketID); setState(ready); + if (socket == null){ socket=new UDTSocket(endPoint,this); + } }catch(Exception ex){ logger.log(Level.WARNING,"Error creating socket",ex); setState(invalid); @@ -146,9 +148,7 @@ ConnectionHandshake handshake = new ConnectionHandshake(); handshake.setConnectionType(ConnectionHandshake.CONNECTION_TYPE_REGULAR); handshake.setSocketType(ConnectionHandshake.SOCKET_TYPE_DGRAM); - long initialSequenceNo=SequenceNumber.random(); - setInitialSequenceNumber(initialSequenceNo); - handshake.setInitialSeqNo(initialSequenceNo); + handshake.setInitialSeqNo(getCurrentSequenceNumber()); handshake.setPacketSize(getDatagramSize()); handshake.setSocketID(mySocketID); handshake.setMaxFlowWndSize(flowWindowSize); Modified: udt-java/skunk/src/main/java/udt/ServerSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/ServerSession.java 2011-08-05 04:24:00 UTC (rev 60) @@ -40,7 +40,7 @@ import java.util.logging.Logger; import udt.packets.ConnectionHandshake; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.KeepAlive; import udt.packets.Shutdown; @@ -57,7 +57,7 @@ private UDTPacket lastPacket; public ServerSession(DatagramPacket dp, UDPEndPoint endPoint)throws SocketException,UnknownHostException{ - super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new Destination(dp.getAddress(),dp.getPort())); + super("ServerSession localPort="+endPoint.getLocalPort()+" peer="+dp.getAddress()+":"+dp.getPort(),new UDTSocketAddress(dp.getAddress(),dp.getPort(),0)); this.endPoint=endPoint; logger.info("Created "+toString()+" talking to "+dp.getAddress()+":"+dp.getPort()); } @@ -65,7 +65,7 @@ int n_handshake=0; @Override - public void received(UDTPacket packet, Destination peer){ + public void received(UDTPacket packet, UDTSocketAddress peer){ lastPacket=packet; if(packet instanceof ConnectionHandshake) { @@ -83,7 +83,9 @@ n_handshake++; try{ setState(ready); + if (socket == null){ socket=new UDTSocket(endPoint, this); + } cc.init(); }catch(Exception uhe){ //session is invalid @@ -166,7 +168,7 @@ long clientBufferSize=handshake.getPacketSize(); long myBufferSize=getDatagramSize(); long bufferSize=Math.min(clientBufferSize, myBufferSize); - long initialSequenceNumber=handshake.getInitialSeqNo(); + int initialSequenceNumber=(int) handshake.getInitialSeqNo(); setInitialSequenceNumber(initialSequenceNumber); setDatagramSize((int)bufferSize); responseHandshake.setPacketSize(bufferSize); Modified: udt-java/skunk/src/main/java/udt/UDPEndPoint.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDPEndPoint.java 2011-08-05 04:24:00 UTC (rev 60) @@ -36,20 +36,34 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.util.Collection; +import java.util.HashSet; import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; import udt.packets.ConnectionHandshake; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.PacketFactory; +import udt.util.ObjectPool; import udt.util.UDTThreadFactory; /** @@ -58,14 +72,116 @@ */ public class UDPEndPoint { + //class fields private static final Logger logger=Logger.getLogger(ClientSession.class.getName()); + public static final int DATAGRAM_SIZE=1400; + + //class methods + private static final WeakHashMap<SocketAddress, UDPEndPoint> localEndpoints + = new WeakHashMap<SocketAddress, UDPEndPoint>(); + + public static UDPEndPoint get(DatagramSocket socket){ + SocketAddress localInetSocketAddress = null; + UDPEndPoint result = null; + if ( socket.isBound()){ + SocketAddress sa = socket.getLocalSocketAddress(); + if ( sa instanceof InetSocketAddress ){ + localInetSocketAddress = (InetSocketAddress) sa; + } else { + // Must be a special DatagramSocket impl or extended. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + } + if (result != null) return result; + try { + result = new UDPEndPoint(socket); + if (localInetSocketAddress == null){ + // The DatagramSocket was unbound, it should be bound now. + localInetSocketAddress = + new InetSocketAddress(socket.getLocalAddress(), socket.getLocalPort()); + } + } catch (SocketException ex) { + Logger.getLogger(UDPEndPoint.class.getName()).log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); + if (exists != null && exists.getSocket().equals(socket)) result = exists; + // Only cache if a record doesn't already exist. + else if (exists == null) localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + + + public static UDPEndPoint get(InetAddress localAddress, int localPort){ + InetSocketAddress localInetSocketAddress = new InetSocketAddress(localAddress, localPort); + return get(localInetSocketAddress); + } + + public static UDPEndPoint get(SocketAddress localSocketAddress){ + InetSocketAddress localInetSocketAddress = null; + if (localSocketAddress instanceof InetSocketAddress){ + localInetSocketAddress = (InetSocketAddress) localSocketAddress; + } else if (localSocketAddress instanceof UDTSocketAddress){ + UDTSocketAddress udtSA = (UDTSocketAddress) localSocketAddress; + localInetSocketAddress = + new InetSocketAddress(udtSA.getAddress(), udtSA.getPort()); + } + if (localInetSocketAddress == null) return null; + UDPEndPoint result = null; + synchronized (localEndpoints){ + result = localEndpoints.get(localInetSocketAddress); + } + if (result != null) return result; + try { + result = new UDPEndPoint(localInetSocketAddress); + if (localInetSocketAddress.getPort() == 0 || + localInetSocketAddress.getAddress().isAnyLocalAddress()){ + // ephemeral port or wildcard address, bind operation is complete. + localInetSocketAddress = + new InetSocketAddress(result.getLocalAddress(), result.getLocalPort()); + } + } catch (SocketException ex) { + logger.log(Level.SEVERE, null, ex); + } catch (UnknownHostException ex) { + logger.log(Level.SEVERE, null, ex); + } + if (result != null){ + synchronized (localEndpoints){ + UDPEndPoint exists = localEndpoints.get(localInetSocketAddress); + if (exists != null) result = exists; + else localEndpoints.put(localInetSocketAddress, result); + } + } + return result; // may be null. + } + + /** + * Allows a custom endpoint to be added to the pool. + * @param endpoint + */ + public static void put(UDPEndPoint endpoint){ + SocketAddress local = endpoint.getSocket().getLocalSocketAddress(); + synchronized (localEndpoints){ + localEndpoints.put(local, endpoint); + } + } + + //object fields private final int port; private final DatagramSocket dgSocket; //active sessions keyed by socket ID - private final Map<Long,UDTSession>sessions=new ConcurrentHashMap<Long, UDTSession>(); + private final Map<Integer,UDTSession>sessions=new ConcurrentHashMap<Integer, UDTSession>(); //last received packet private UDTPacket lastPacket; @@ -74,20 +190,39 @@ //this queue is used to handoff new UDTSessions to the application private final SynchronousQueue<UDTSession> sessionHandoff=new SynchronousQueue<UDTSession>(); + private final ObjectPool<BlockingQueue<UDTSession>> queuePool + = new ObjectPool<BlockingQueue<UDTSession>>(20); + + private final ConcurrentMap<Integer, BlockingQueue<UDTSession>> handoff + = new ConcurrentHashMap<Integer, BlockingQueue<UDTSession>>(120); + + // registered sockets + private final Set<Integer> registeredSockets = new HashSet<Integer>(120); + // registered sockets lock + private final ReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock readSocketLock = rwl.readLock(); + private final Lock writeSocketLock = rwl.writeLock(); + + private boolean serverSocketMode=false; //has the endpoint been stopped? private volatile boolean stopped=false; - public static final int DATAGRAM_SIZE=1400; + private final AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); + /** * create an endpoint on the given socket * * @param socket - a UDP datagram socket + * @throws SocketException */ - public UDPEndPoint(DatagramSocket socket){ + protected UDPEndPoint(DatagramSocket socket) throws SocketException{ this.dgSocket=socket; + if (!socket.isBound()){ + socket.bind(null); + } port=dgSocket.getLocalPort(); } @@ -97,7 +232,7 @@ * @throws SocketException * @throws UnknownHostException */ - public UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{ + private UDPEndPoint(InetAddress localAddress)throws SocketException, UnknownHostException{ this(localAddress,0); } @@ -108,7 +243,7 @@ * @throws SocketException * @throws UnknownHostException */ - public UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ + private UDPEndPoint(InetAddress localAddress, int localPort)throws SocketException, UnknownHostException{ if(localAddress==null){ dgSocket=new DatagramSocket(localPort, localAddress); }else{ @@ -120,6 +255,13 @@ configureSocket(); } + private UDPEndPoint (InetSocketAddress localSocketAddress) + throws SocketException, UnknownHostException { + dgSocket = new DatagramSocket(localSocketAddress); + port = dgSocket.getLocalPort(); + configureSocket(); + } + protected void configureSocket()throws SocketException{ //set a time out to avoid blocking in doReceive() dgSocket.setSoTimeout(100000); @@ -184,6 +326,43 @@ } /** + * Provides assistance to a socket to determine a random socket id, + * every caller receives a unique value. This value is unique at the + * time of calling, however it may not be at registration time. + * + * This socketID has not been registered, all socket ID's must be + * registered or connection will fail. + * @return + */ + public int getUniqueSocketID(){ + Integer socketID = nextSocketID.getAndIncrement(); + try{ + readSocketLock.lock(); + while (registeredSockets.contains(socketID)){ + socketID = nextSocketID.getAndIncrement(); + } + return socketID; // should we register it? + } finally { + readSocketLock.unlock(); + } + } + + void registerSocketID(int socketID, UDTSocket socket) throws SocketException { + if (!equals(socket.getEndpoint())) throw new SocketException ( + "Socket doesn't originate for this endpoint: " + + socket.toString()); + try { + writeSocketLock.lock(); + if (registeredSockets.contains(socketID)){ + throw new SocketException("Already registered, Socket ID: " +socketID); + } + registeredSockets.add(socketID); + }finally{ + writeSocketLock.unlock(); + } + } + + /** * @return the port which this client is bound to */ public int getLocalPort() { @@ -204,8 +383,8 @@ return lastPacket; } - public void addSession(Long destinationID,UDTSession session){ - logger.info("Storing session <"+destinationID+">"); + public void addSession(Integer destinationID,UDTSession session){ + logger.log(Level.INFO, "Storing session <{0}>", destinationID); sessions.put(destinationID, session); } @@ -221,12 +400,33 @@ * wait the given time for a new connection * @param timeout - the time to wait * @param unit - the {@link TimeUnit} + * @param socketID - the socket id. * @return a new {@link UDTSession} * @throws InterruptedException */ - protected UDTSession accept(long timeout, TimeUnit unit)throws InterruptedException{ - return sessionHandoff.poll(timeout, unit); + protected UDTSession accept(long timeout, TimeUnit unit, Integer socketID)throws InterruptedException{ + //return sessionHandoff.poll(timeout, unit); + BlockingQueue<UDTSession> session = handoff.get(socketID); + try { + if (session == null){ + session = queuePool.get(); + if (session == null) { + session = new ArrayBlockingQueue<UDTSession>(1); } + BlockingQueue<UDTSession> existed = handoff.putIfAbsent(socketID,session); + if (existed != null){ + session = existed; + } + } + return session.poll(timeout, unit); + } finally { + boolean removed = handoff.remove(socketID, session); + if (removed){ + session.clear(); + queuePool.accept(session); + } + } + } final DatagramPacket dp= new DatagramPacket(new byte[DATAGRAM_SIZE],DATAGRAM_SIZE); @@ -250,32 +450,42 @@ protected void doReceive()throws IOException{ while(!stopped){ try{ - try{ - //will block until a packet is received or timeout has expired dgSocket.receive(dp); - - Destination peer=new Destination(dp.getAddress(), dp.getPort()); + UDTSocketAddress peer= null; int l=dp.getLength(); UDTPacket packet=PacketFactory.createPacket(dp.getData(),l); lastPacket=packet; - //handle connection handshake if(packet.isConnectionHandshake()){ synchronized(lock){ Long id=Long.valueOf(packet.getDestinationID()); UDTSession session=sessions.get(id); - if(session==null){ + if(session==null){ // What about DOS? session=new ServerSession(dp,this); addSession(session.getSocketID(),session); //TODO need to check peer to avoid duplicate server session if(serverSocketMode){ logger.fine("Pooling new request."); - sessionHandoff.put(session); +// sessionHandoff.put(session); // blocking method, what about offer? + BlockingQueue<UDTSession> queue = handoff.get(session.getSocketID()); + if (queue != null){ + boolean success = queue.offer(session); + if (success){ logger.fine("Request taken for processing."); + } else { + logger.fine("Request discarded, queue full."); } + } else { + logger.fine("No ServerSocket listening at socketID: " + + session.getSocketID() + + "to answer request"); } + } + } peer.setSocketID(((ConnectionHandshake)packet).getSocketID()); + peer = new UDTSocketAddress(dp.getAddress(), dp.getPort(), + ((ConnectionHandshake)packet).getSocketID()); session.received(packet,peer); } } @@ -294,7 +504,9 @@ if(session==null){ n++; if(n%100==1){ - logger.warning("Unknown session <"+dest+"> requested from <"+peer+"> packet type "+packet.getClass().getName()); + logger.warning("Unknown session <"+dest + +"> requested from <"+peer+"> packet type " + +packet.getClass().getName()); } } else{ @@ -305,8 +517,6 @@ logger.log(Level.INFO, "SocketException: "+ex.getMessage()); }catch(SocketTimeoutException ste){ //can safely ignore... we will retry until the endpoint is stopped - } - }catch(Exception ex){ logger.log(Level.WARNING, "Got: "+ex.getMessage(),ex); } Modified: udt-java/skunk/src/main/java/udt/UDTClient.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTClient.java 2011-08-05 04:24:00 UTC (rev 60) @@ -33,13 +33,15 @@ package udt; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.logging.Level; import java.util.logging.Logger; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; import udt.packets.Shutdown; import udt.util.UDTStatistics; @@ -52,13 +54,13 @@ public UDTClient(InetAddress address, int localport)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint=new UDPEndPoint(address,localport); + clientEndpoint= UDPEndPoint.get(address,localport); logger.info("Created client endpoint on port "+localport); } public UDTClient(InetAddress address)throws SocketException, UnknownHostException{ //create endpoint - clientEndpoint=new UDPEndPoint(address); + clientEndpoint= UDPEndPoint.get(address, 0); logger.info("Created client endpoint on port "+clientEndpoint.getLocalPort()); } @@ -75,7 +77,7 @@ */ public void connect(String host, int port)throws InterruptedException, UnknownHostException, IOException{ InetAddress address=InetAddress.getByName(host); - Destination destination=new Destination(address,port); + UDTSocketAddress destination= new UDTSocketAddress(address,port,0); //create client session... clientSession=new ClientSession(clientEndpoint,destination); clientEndpoint.addSession(clientSession.getSocketID(), clientSession); @@ -101,7 +103,7 @@ } /** - * sends the given data and waits for acknowledgement + * sends the given data and waits for acknowledgment * @param data - the data to send * @throws IOException * @throws InterruptedException if interrupted while waiting for ack @@ -143,11 +145,11 @@ } } - public UDTInputStream getInputStream()throws IOException{ + public InputStream getInputStream()throws IOException{ return clientSession.getSocket().getInputStream(); } - public UDTOutputStream getOutputStream()throws IOException{ + public OutputStream getOutputStream()throws IOException{ return clientSession.getSocket().getOutputStream(); } Modified: udt-java/skunk/src/main/java/udt/UDTCongestionControl.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTCongestionControl.java 2011-08-05 04:24:00 UTC (rev 60) @@ -65,7 +65,7 @@ public UDTCongestionControl(UDTSession session){ this.session=session; this.statistics=session.getStatistics(); - lastDecreaseSeqNo=session.getInitialSequenceNumber()-1; + lastDecreaseSeqNo=session.getCurrentSequenceNumber()-1; } /* (non-Javadoc) Modified: udt-java/skunk/src/main/java/udt/UDTInputStream.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTInputStream.java 2011-08-05 04:24:00 UTC (rev 60) @@ -66,10 +66,10 @@ * @param socket - the {@link UDTSocket} * @throws IOException */ - public UDTInputStream(UDTSocket socket)throws IOException{ + UDTInputStream(UDTSocket socket)throws IOException{ this.socket=socket; int capacity=socket!=null? 2 * socket.getSession().getFlowWindowSize() : 128 ; - long initialSequenceNum=socket!=null?socket.getSession().getInitialSequenceNumber():1; + int initialSequenceNum=socket!=null?socket.getSession().getCurrentSequenceNumber():1; receiveBuffer=new ReceiveBuffer(capacity,initialSequenceNum); } Modified: udt-java/skunk/src/main/java/udt/UDTReceiver.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTReceiver.java 2011-08-05 04:24:00 UTC (rev 60) @@ -169,7 +169,7 @@ packetHistoryWindow = new PacketHistoryWindow(16); receiverLossList = new ReceiverLossList(); packetPairWindow = new PacketPairWindow(16); - largestReceivedSeqNumber=session.getInitialSequenceNumber()-1; + largestReceivedSeqNumber=session.getCurrentSequenceNumber()-1; bufferSize=session.getReceiveBufferSize(); handoffQueue=new ArrayBlockingQueue<UDTPacket>(4*session.getFlowWindowSize()); storeStatistics=Boolean.getBoolean("udt.receiver.storeStatistics"); @@ -396,7 +396,7 @@ // return; // } // //} - boolean OK=session.getSocket().getInputStream().haveNewData(currentSequenceNumber,dp.getData()); + boolean OK= ((UDTInputStream) session.getSocket().getInputStream()).haveNewData(currentSequenceNumber,dp.getData()); if(!OK){ //need to drop packet... return; Modified: udt-java/skunk/src/main/java/udt/UDTSender.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTSender.java 2011-08-05 04:24:00 UTC (rev 60) @@ -68,9 +68,7 @@ private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); private final UDPEndPoint endpoint; - private final UDTSession session; - private final UDTStatistics statistics; //senderLossList stores the sequence numbers of lost packets @@ -92,31 +90,31 @@ private final AtomicInteger unacknowledged=new AtomicInteger(0); //for generating data packet sequence numbers - private volatile long currentSequenceNumber=0; + //volatile long counters are not atomic. + //private volatile int currentSequenceNumber=0; //the largest data packet sequence number that has actually been sent out - private volatile long largestSentSequenceNumber=-1; + private volatile int largestSentSequenceNumber=-1; //last acknowledge number, initialised to the initial sequence number - private volatile long lastAckSequenceNumber; + private volatile int lastAckSequenceNumber; private volatile boolean started=false; - private volatile boolean stopped=false; - private volatile boolean paused=false; //used to signal that the sender should start to send private volatile CountDownLatch startLatch=new CountDownLatch(1); //used by the sender to wait for an ACK - private final AtomicReference<CountDownLatch> waitForAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForAckLatch + =new AtomicReference<CountDownLatch>(); //used by the sender to wait for an ACK of a certain sequence number - private final AtomicReference<CountDownLatch> waitForSeqAckLatch=new AtomicReference<CountDownLatch>(); + private final AtomicReference<CountDownLatch> waitForSeqAckLatch + =new AtomicReference<CountDownLatch>(); private final boolean storeStatistics; - private final int chunksize; public UDTSender(UDTSession session,UDPEndPoint endpoint){ @@ -128,8 +126,8 @@ sendBuffer=new ConcurrentHashMap<Long, byte[]>(session.getFlowWindowSize(),0.75f,2); chunksize=session.getDatagramSize()-24;//need space for the header; flowWindow=new FlowWindow(session.getFlowWindowSize(),chunksize); - lastAckSequenceNumber=session.getInitialSequenceNumber(); - currentSequenceNumber=session.getInitialSequenceNumber()-1; + lastAckSequenceNumber=session.getCurrentSequenceNumber(); +// currentSequenceNumber=session.getCurrentSequenceNumber()-1; waitForAckLatch.set(new CountDownLatch(1)); waitForSeqAckLatch.set(new CountDownLatch(1)); storeStatistics=Boolean.getBoolean("udt.sender.storeStatistics"); @@ -212,6 +210,7 @@ protected void sendUdtPacket(ByteBuffer bb, int timeout, TimeUnit units)throws IOException, InterruptedException{ if(!started)start(); DataPacket packet=null; + do{ packet=flowWindow.getForProducer(); if(packet==null){ @@ -219,7 +218,7 @@ } }while(packet==null);//TODO check timeout... try{ - packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setPacketSequenceNumber(incrementAndGetSequenceNo()); packet.setSession(session); packet.setDestinationID(session.getDestination().getSocketID()); int len=Math.min(bb.remaining(),chunksize); @@ -253,7 +252,7 @@ } }while(packet==null); try{ - packet.setPacketSequenceNumber(getNextSequenceNumber()); + packet.setPacketSequenceNumber(incrementAndGetSequenceNo()); packet.setSession(session); packet.setDestinationID(session.getDestination().getSocketID()); packet.setData(data); @@ -309,7 +308,7 @@ unacknowledged.decrementAndGet(); } } - lastAckSequenceNumber=Math.max(lastAckSequenceNumber, ackNumber); + lastAckSequenceNumber=(int) Math.max(lastAckSequenceNumber, ackNumber); //send ACK2 packet to the receiver sendAck2(ackNumber); statistics.incNumberOfACKReceived(); @@ -362,22 +361,20 @@ Long entry=senderLossList.getFirstEntry(); if(entry!=null){ handleRetransmit(entry); - } - else - { + }else{ //if the number of unacknowledged data packets does not exceed the congestion //and the flow window sizes, pack a new packet int unAcknowledged=unacknowledged.get(); - - if(unAcknowledged<session.getCongestionControl().getCongestionWindowSize() - && unAcknowledged<session.getFlowWindowSize()){ + if(unAcknowledged<session.getCongestionControl() + .getCongestionWindowSize() + && unAcknowledged<session.getFlowWindowSize()) + { //check for application data DataPacket dp=flowWindow.consumeData(); if(dp!=null){ send(dp); - largestSentSequenceNumber=dp.getPacketSequenceNumber(); - } - else{ + largestSentSequenceNumber=(int) dp.getPacketSequenceNumber(); + }else{ statistics.incNumberOfMissingDataEvents(); } }else{ @@ -388,7 +385,6 @@ waitForAck(); } } - //wait if(largestSentSequenceNumber % 16 !=0){ long snd=(long)session.getCongestionControl().getSendInterval(); @@ -445,13 +441,12 @@ * the next sequence number for data packets. * The initial sequence number is "0" */ - public long getNextSequenceNumber(){ - currentSequenceNumber=SequenceNumber.increment(currentSequenceNumber); - return currentSequenceNumber; + public int incrementAndGetSequenceNo(){ + return session.incrementAndGetSequenceNo(); } - public long getCurrentSequenceNumber(){ - return currentSequenceNumber; + public int getCurrentSequenceNumber(){ + return session.getCurrentSequenceNumber(); } /** Modified: udt-java/skunk/src/main/java/udt/UDTServerSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTServerSocket.java 2011-08-05 04:24:00 UTC (rev 60) @@ -31,35 +31,48 @@ *********************************************************************************/ package udt; +import java.io.IOException; import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; import java.net.UnknownHostException; +import java.nio.channels.ServerSocketChannel; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import java.util.logging.Logger; -public class UDTServerSocket { +public class UDTServerSocket extends ServerSocket { private static final Logger logger=Logger.getLogger(UDTClient.class.getName()); - private final UDPEndPoint endpoint; + private volatile UDPEndPoint endpoint; + private volatile InetAddress localAdd; + private volatile int locPort; + private volatile SocketAddress localSocketAddress; - private boolean started=false; - + private volatile boolean started=false; + private volatile boolean bound = false; private volatile boolean shutdown=false; /** * create a UDT ServerSocket - * @param localAddress + * @param localSocketAddress * @param port - the local port. If 0, an ephemeral port will be chosen */ - public UDTServerSocket(InetAddress localAddress, int port)throws SocketException,UnknownHostException{ - endpoint=new UDPEndPoint(localAddress,port); + public UDTServerSocket(InetAddress localAddress, int port)throws UnknownHostException, IOException{ + super(); + endpoint= UDPEndPoint.get(localAddress,port); + localAdd = localAddress; + locPort = port; + bound = true; logger.info("Created server endpoint on port "+endpoint.getLocalPort()); } //starts a server on localhost - public UDTServerSocket(int port)throws SocketException,UnknownHostException{ + public UDTServerSocket(int port)throws IOException,UnknownHostException{ this(InetAddress.getLocalHost(),port); } @@ -68,13 +81,16 @@ * for the new connection * @return */ - public synchronized UDTSocket accept()throws InterruptedException{ +@Override + public synchronized Socket accept() throws IOException{ if(!started){ endpoint.start(true); started=true; } + // TODO: use a blocking queue. while(!shutdown){ - UDTSession session=endpoint.accept(10000, TimeUnit.MILLISECONDS); + try { + UDTSession session = endpoint.accept(10000, TimeUnit.MILLISECONDS, null); if(session!=null){ //wait for handshake to complete while(!session.isReady() || session.getSocket()==null){ @@ -82,16 +98,107 @@ } return session.getSocket(); } + } catch (InterruptedException ex) { + throw new IOException(ex); } - throw new InterruptedException(); } + throw new IOException("UDTSession was null"); + } - public void shutDown(){ + public UDPEndPoint getEndpoint(){ + return endpoint; + } + + @Override + public void bind(SocketAddress endpoint){ + //TODO: Implement ServerSocket.bind + } + + @Override + public void bind(SocketAddress endpoint, int timeout){ + //TODO: Implement ServerSocket.bind + } + + @Override + public void close(){ shutdown=true; + // TODO: The endpoint might have other ServerSocket's listening, + // we need to pass the endpoint the socket, or session or something + // the endpoint should only stop when it has no remaining sessions. endpoint.stop(); } - public UDPEndPoint getEndpoint(){ - return endpoint; + @Override + public ServerSocketChannel getChannel(){ + return null; } + + @Override + public InetAddress getInetAddress(){ + return localAdd; } + + @Override + public int getLocalPort(){ + return locPort; + } + + @Override + public SocketAddress getLocalSocketAddress(){ + return localSocketAddress; + } + + @Override + public int getReceiveBufferSize(){ + return 0; + } + + @Override + public boolean getReuseAddress(){ + return false; + } + + @Override + public int getSoTimeout(){ + return 0; + } + + @Override + public boolean isBound(){ + return started; + } + + @Override + public boolean isClosed(){ + return shutdown; + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth){ + + } + + @Override + public void setReceiveBufferSize(int size){ + + } + + @Override + public void setReuseAddress(boolean on){ + + } + + @Override + public void setSoTimeout(int timeout){ + + } + + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(120); + sb.append("UDTServerSocket: \n"); + //TODO: add statistics. + return sb.toString(); + } + +} Modified: udt-java/skunk/src/main/java/udt/UDTSession.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTSession.java 2011-08-05 04:24:00 UTC (rev 60) @@ -33,12 +33,16 @@ package udt; import java.net.DatagramPacket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -import udt.packets.Destination; +import udt.packets.UDTSocketAddress; +import udt.util.SequenceNumber; import udt.util.UDTStatistics; public abstract class UDTSession { @@ -79,7 +83,7 @@ /** * remote UDT entity (address and socket ID) */ - protected final Destination destination; + protected final UDTSocketAddress destination; /** * local port @@ -101,16 +105,19 @@ */ protected int datagramSize=DEFAULT_DATAGRAM_SIZE; - protected Long initialSequenceNumber=null; + protected int initialSequenceNumber=11; - protected final long mySocketID; + private final AtomicInteger sequenceNo = new AtomicInteger(SequenceNumber.random()); - private final static AtomicLong nextSocketID=new AtomicLong(20+new Random().nextInt(5000)); + protected final int mySocketID; - public UDTSession(String description, Destination destination){ + private final static AtomicInteger nextSocketID=new AtomicInteger(20+new Random().nextInt(5000)); + + public UDTSession(String description, UDTSocketAddress dest){ + InetSocketAddress inetAdd = null; statistics=new UDTStatistics(description); mySocketID=nextSocketID.incrementAndGet(); - this.destination=destination; + this.destination= dest; this.dgPacket=new DatagramPacket(new byte[0],0,destination.getAddress(),destination.getPort()); String clazzP=System.getProperty(CC_CLASS,UDTCongestionControl.class.getName()); Object ccObject=null; @@ -125,10 +132,8 @@ logger.info("Using "+cc.getClass().getName()); } + public abstract void received(UDTPacket packet, UDTSocketAddress peer); - public abstract void received(UDTPacket packet, Destination peer); - - public UDTSocket getSocket() { return socket; } @@ -137,7 +142,7 @@ return cc; } - public int getState() { + protected int getState() { return state; } @@ -149,7 +154,7 @@ this.socket = socket; } - public void setState(int state) { + protected void setState(int state) { logger.info(toString()+" connection state CHANGED to <"+state+">"); this.state = state; } @@ -170,7 +175,7 @@ return state==shutdown || state==invalid; } - public Destination getDestination() { + public UDTSocketAddress getDestination() { return destination; } @@ -202,21 +207,30 @@ return statistics; } - public long getSocketID(){ + public int getSocketID(){ return mySocketID; } - public synchronized long getInitialSequenceNumber(){ - if(initialSequenceNumber==null){ - initialSequenceNumber=1l; //TODO must be random? + public int getCurrentSequenceNumber(){ + return sequenceNo.get(); } - return initialSequenceNumber; + + public int incrementAndGetSequenceNo(){ + while (true){ + int sequence = sequenceNo.get(); + int increment = SequenceNumber.increment(sequence); + boolean success = false; + success = sequenceNo.compareAndSet(sequence, increment); + if (success) return increment; } + } - public synchronized void setInitialSequenceNumber(long initialSequenceNumber){ - this.initialSequenceNumber=initialSequenceNumber; + protected void setInitialSequenceNumber(int initialSequenceNumber){ + if (state == handshaking){ + sequenceNo.set(initialSequenceNumber); } + } public DatagramPacket getDatagram(){ return dgPacket; Modified: udt-java/skunk/src/main/java/udt/UDTSocket.java =================================================================== --- udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 01:14:28 UTC (rev 59) +++ udt-java/skunk/src/main/java/udt/UDTSocket.java 2011-08-05 04:24:00 UTC (rev 60) @@ -32,10 +32,23 @@ package udt; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; import java.net.SocketException; +import java.net.SocketOptions; import java.net.UnknownHostException; import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import udt.packets.UDTSocketAddress; /** * UDTSocket is analogous to a normal java.net.Socket, it provides input and * output streams for the application @@ -43,34 +56,443 @@ * TODO is it possible to actually extend java.net.Socket ? * */ -public class UDTSocket { +public class UDTSocket extends Socket{ + private static final ArrayList<UDTSocketAddress> boundSockets + = new ArrayList<UDTSocketAddress>(120); + //endpoint - private final UDPEndPoint endpoint; + private volatile UDPEndPoint endpoint; private volatile boolean active; + private volatile boolean connected; + private volatile boolean bound; + private volatile boolean shutIn; // receiver closed. + private volatile boolean shutOut; // sender closed. + private volatile boolean closed; //processing received data - private UDTReceiver receiver; - private UDTSender sender; + private volatile UDTReceiver receiver; + private volatile UDTSender sender; - private final UDTSession session; + private volatile UDTSession session; - private UDTInputStream inputStream; - private UDTOutputStream outputStream; + private volatile UDTInputStream inputStream; + private volatile UDTOutputStream outputStream; + + private volatile UDTSocketAddress localSocketAddress; + private volatile UDTSocketAddress destination; /** + * The session is usually the caller for this constructor + * so the session already knows this is the socket. * @param host * @param port * @param endpoint * @throws SocketException,UnknownHostException */ - public UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ + UDTSocket(UDPEndPoint endpoint, UDTSession session)throws SocketException,UnknownHostException{ + super(); this.endpoint=endpoint; this.session=session; this.receiver=new UDTReceiver(session,endpoint); this.sender=new UDTSender(session,endpoint); + localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(), + endpoint.getLocalPort(), session.getSocketID()); + destination = session.getDestination(); + bound = true; } + public UDTSocket(InetAddress host, int port ) throws SocketException, + UnknownHostException{ + super(); + this.endpoint = UDPEndPoint.get(host, port); + this.session = null; + this.receiver = null; + this.sender = null; + active = false; + bound = true; + + } + + public UDTSocket(){ + super(); + endpoint = null; + session = null; + receiver = null; + sender = null; + active = false; + bound = false; + } + + @Override + public void connect(SocketAddress destination) throws IOException { + connect(destination, 0); + } + + @Override + public void connect(SocketAddress destination, int timeout) throws IOException { + if (destination == null) throw new IllegalArgumentException("connect: The address can't be null"); + if (timeout < 0) throw new IllegalArgumentException("connect: timeout can't be negative"); + if (isClosed()) throw new SocketException("Socket is closed"); + if (isConnected()) throw new SocketException("already connected"); + if (!(destination instanceof UDTSocketAddress)) + throw new IllegalArgumentException("Unsupported address type"); + + UDTSocketAddress epoint = (UDTSocketAddress) destination; + InetAddress addr = epoint.getAddress(); + int port = epoint.getPort(); + + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkConnect(addr.getHostAddress(),port); + } + if (session == null){ + session = new ClientSession(endpoint, (UDTSocketAddress) destination); + session.setSocket(this); + } + endpoint.addSession(session.getSocketID(), session); + receiver = new UDTReceiver(session, endpoint); + sender = new UDTSender(session, endpoint); + localSocketAddress = new UDTSocketAddress(endpoint.getLocalAddress(), + endpoint.getLocalPort(), session.getSocketID()); + endpoint.start(); + try { + ((ClientSession)session).connect(); + } catch (InterruptedException ex) { + throw new IOException(ex); + } + destination = session.getDestination(); + connected = true; + /* + * If the socket was not bound before the connect, it is now because + * the kernel will have picked an ephemeral port & a local address + */ + bound = true; + } + + /** + * Binds the socket to a local address. + * <P> + * If the address is <code>null</code>, then the system will pick up + * an ephemeral port and a valid local address to bind the socket. + * + * @param bindpoint the <code>SocketAddress</code> to bind to + * @throws IOException if the bind operation fails, or if the socket + * is already bound. + * @throws IllegalArgumentException if bindpoint is a + * SocketAddress subclass not supported by this socket + * + * @since 1.4 + * @see #isBound + */ + public void bind(SocketAddress bindpoint) throws IOException { + if (!(bindpoint instanceof UDTSocketAddress)) + throw new IllegalArgumentException("Unsupported SocketAddress type"); + if (isBound()) throw new IOException("Socket already bound"); + synchronized (boundSockets) { + if (boundSockets.contains(bindpoint)) throw + new IOException("A socket is already bound to this address"); + } + endpoint = UDPEndPoint.get(bindpoint); + if (endpoint == null) throw new SocketException("Failed to bind to UDPEndPoint"); + bound = true; + } + + @Override + public InetAddress getInetAddress() { + if (!isConnected()) return null; + return destination.getAddress(); + } + + // Inherit javadoc. + @Override + public InetAddress getLocalAddress() { + // This is for backward compatibility only, the super is not bound and + // returns InetAddress.anyLocalAddress(); + if (!isBound()) super.getLocalAddress(); + return localSocketAddress.getAddress(); + } + + // Inherit javadoc. + @Override + public int getPort() { + if (!isConnected()) return 0; + return destination.getPort(); + } + + // Inherit javadoc. + @Override + public int getLocalPort() { + if (!isBound()) return -1; + return localSocketAddress.getPort(); + } + + // Inherit javadoc. + @Override + public SocketAddress getRemoteSocketAddress() { + if (!isConnected()) return null; + return destination; + } + + // Inherit javadoc. + @Override + public SocketAddress getLocalSocketAddress() { + if (!isBound()) return null; + return localSocketAddress; + } + + @Override + public SocketChannel getChannel() { + return null; + } + + /** + * Not supported + * @param on + * @throws SocketException + */ + @Override + public void setTcpNoDelay(boolean on) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on)); + } + + /** + * Not supported + * @return false + * @throws SocketException + */ + @Override + public boolean getTcpNoDelay() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + return ((Boolean) getOption(SocketOptions.TCP_NODELAY)).booleanValue(); + } + + /** + * Not supported. + * @param on + * @param linger + * @throws SocketException + */ + @Override + public void setSoLinger(boolean on, int linger) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + if (linger < 0) throw new IllegalArgumentException("SO_LINGER cannot be less than zero"); + // do nothing, not supported. + } + + @Override + public int getSoLinger() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + return -1; // implies this option is disabled + } + + // Sending of urgent data, should we support it? + @Override + public void sendUrgentData (int data) throws IOException { + throw new SocketException ("Urgent data not supported"); + } + + // Sending of urgent data, should we enable it? + @Override + public void setOOBInline(boolean on) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.SO_OOBINLINE, Boolean.valueOf(on)); + } + + @Override + public boolean getOOBInline() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + return ((Boolean) getOption(SocketOptions.SO_OOBINLINE)).booleanValue(); + } + + // TODO: implement set socket timeout. + @Override + public void setSoTimeout(int timeout) throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + if (timeout < 0) throw new IllegalArgumentException("negative timeout not allowed"); + setOption(SocketOptions.SO_TIMEOUT, new Integer(timeout)); + } + + // TODO: Implement get socket timeout. + @Override + public synchronized int getSoTimeout() throws SocketException { + if (isClosed()) throw new SocketException("Socket closed"); + Object o = getOption(SocketOptions.SO_TIMEOUT); + if (o instanceof Integer) return ((Integer) o).intValue(); + return 0; + } + + // object method signature compatibility only, not currently supported. + @Override + public void setSendBufferSize(int size) + throws SocketException{ + if (!(size > 0)) throw new IllegalArgumentException("negative send size not allowed"); + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.SO_SNDBUF, new Integer(size)); + } + + // object method signature compatibility only, not currently supported. + @Override + public int getSendBufferSize() throws SocketException { + if (isClosed()) throw new SocketException("Socket is closed"); + int result = 0; + Object o = getOption(SocketOptions.SO_SNDBUF); + if (o instanceof Integer) result = ((Integer)o).intValue(); + return result; + } + + // object method signature compatibility only, not currently supported. + @Override + public void setReceiveBufferSize(int size) + throws SocketException{ + if (size <= 0) throw new IllegalArgumentException("invalid receive size"); + if (isClosed()) throw new SocketException("Socket closed"); + setOption(SocketOptions.SO_RCVBUF, new Integer(size)); + } + + // object method signature compatibility only, not currently supported. + @Override + public int getReceiveBufferSize() + throws SocketException{ + if (isClosed()) throw new SocketException("Socket closed"); + int result = 0; + Object o = getOption(SocketOptions.SO_RCVBUF); + if (o instanceof Integer) { + result = ((Integer)o).intValue(); + } + return result; + } + + // TODO: Implement keep alive. + @Override + public void setKeepAlive(boolean on) throws SocketException { + if (isClosed()) + throw new SocketException("Socket is closed"); + setOption(SocketOptions.SO_KEEPALIVE, Boolean.valueOf(on)); + } + + + @Override + public boolean getKeepAlive() throws SocketException { + // Keep alive is not currently implemented in the udt session. + if (isClosed()) + throw new SocketException("Socket is closed"); + return ((Boolean) getOption(SocketOptions.SO_KEEPALIVE)).booleanValue(); + } + + public void setTrafficClass(int tc) throws SocketException { + // Safe to ignore, not supported. + } + + public int getTrafficClass() throws SocketException { + // Call redirected to underlying DatagramSocket. + return endpoint.getSocket().getTrafficClass(); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException { + throw new SocketException("SO_REUSEADDR not supported"); + } + + /** + * Tests if SO_REUSEADDR is enabled. + * + * @return a <code>boolean</code> indicating whether or not SO_REUSEADDR is enabled. + * @exception SocketException if there is an error + * in the underlying protocol, such as a TCP error. + * @since 1.4 + * @see #setReuseAddress(boolean) + */ + @Override + public boolean getReuseAddress() throws SocketException { + if (isClosed()) throw new SocketException("Socket is closed"); + return ((Boolean) (getOption(SocketOptions.SO_REUSEADDR))).booleanValue(); + } + + /** + * Places the input stream for this socket at "end of stream". + * Any data sent to the input stream side of the socket is acknowledged + * and then silently discarded. + * <p> + * If you read from a socket input stream after invoking + * shutdownInput() on the socket, the stream will return EOF. + * + * @exception IOException if an I/O error occurs when shutting down this + * socket. + * + * @since 1.3 + * @see java.net.Socket#shutdownOutput() + * @see java.net.Socket#close() + * @see java.net.Socket#setSoLinger(boolean, int) + * @see #isInputShutdown + */ + @Override + public void shutdownInput() throws IOException + { + if (isClosed()) throw new SocketException("Socket closed"); + if (!isConnected()) throw new SocketException("Socket not connected"); + if (isInputShutdown()) throw new SocketException("Socket input already shutdown"); + receiver.stop(); + inputStream.close(); + shutIn = true; + } + + @Override + public void shutdownOutput() throws IOException + { + if (isClosed()) throw new SocketException("Socket closed"); + if (!isConnected()) throw new SocketException("Socket is not connected"); + if (isOutputShutdown()) throw new SocketException("Socket output is already shutdown"); + sender.stop(); + outputStream.close(); + shutOut = true; + } + + /** + * Returns the connection state of the socket. + * + * @return true if the socket successfuly connected to a server + * @since 1.4 + */ + @Override + public boolean isConnected() { + // Before 1.3 Sockets were always connected during creation + return connected; + } + + @Override + public boolean isBound() { + return bound ; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public boolean isInputShutdown() { + return shutIn; + } + + /** + * Returns whether the write-half of the socket connection is closed. + * + * @return true if the output of the socket has been shutdown + * @since 1.4 + * @see #shutdownOutput + */ + @Override + public boolean isOutputShutdown() { + return shutOut; + } + + // some preliminary support for socket options. + private Object getOption(int optID) { + return Boolean.FALSE; + } + public UDTReceiver getReceiver() { return receiver; } @@ -87,9 +509,9 @@ this.sender = sender; } - public void setActive(boolean active) { - this.active = active; - } +// public void setActive(boolean active) { +// this.active = active; +// } public boolean isActive() { return active; @@ -103,7 +525,8 @@ * get the input stream for reading from this socket * @return */ - public synchronized UDTInputStream getInputStream()throws IOException{ + @Override + public synchronized InputStream getInputStream()throws IOException{ if(inputStream==null){ inputStream=new UDTInputStream(this); } @@ -114,7 +537,8 @@ * get the output stream for writing to this socket * @return */ - public synchronized UDTOutputStream getOutputStream(){ + @Override + public synchronized OutputStream getOutputStream(){ if(outputStream==null){ outputStream=new UDTOutputStream(this); } @@ -126,7 +550,7 @@ } /** - * write single block of data without waiting for any acknowledgement + * write single block of data without waiting for any acknowledgment * @param data */ protected void doWrite(byte[]data)throws IOException{ @@ -176,6 +600,8 @@ /** * will block until the outstanding packets have really been sent out * and acknowledged + * + * @throws InterruptedException */ protected void flush() throws InterruptedException{ if(!active)return; @@ -200,14 +626,38 @@ flush(); } + @Override + public String toString(){ + StringBuilder sb = new StringBuilder(400); + sb .append("UDTSocket: \n") + .append("Local address: ") + ... [truncated message content] |