From: <jsa...@us...> - 2009-02-05 16:16:43
|
Revision: 125 http://flexotask.svn.sourceforge.net/flexotask/?rev=125&view=rev Author: jsauerbach Date: 2009-02-05 16:16:39 +0000 (Thu, 05 Feb 2009) Log Message: ----------- In NativeIO add the ability to dynamically vary between blocking and non-blocking operations on an individual socket. Add NonBlockingBuffer class to aid in managing partial reads and writes in Flexotask execute methods. Modified Paths: -------------- trunk/flexotask/src/com/ibm/realtime/flexotask/util/NativeIO.java trunk/flexotask-functiontest/src/alltests/Testcases.java Added Paths: ----------- trunk/flexotask/src/com/ibm/realtime/flexotask/util/NonBlockingBuffer.java trunk/flexotask-functiontest/src/nativeio/ trunk/flexotask-functiontest/src/nativeio/Main.java Modified: trunk/flexotask/src/com/ibm/realtime/flexotask/util/NativeIO.java =================================================================== --- trunk/flexotask/src/com/ibm/realtime/flexotask/util/NativeIO.java 2009-02-02 20:49:06 UTC (rev 124) +++ trunk/flexotask/src/com/ibm/realtime/flexotask/util/NativeIO.java 2009-02-05 16:16:39 UTC (rev 125) @@ -33,7 +33,8 @@ * this class also does *something* when the natives are absent, but it is very approximate. Socket writes are * blocking, socket reads use a 1ms timeout, and rs232 I/O is unavailable. * - * Socket reads can also be set to blocking in the simulated case, but not in the flexotask VM run time. + * Individual reads and writes can be optionally changed to "blocking" behavior. An OS poll or ppoll call is used to + * accomplish this. */ public class NativeIO { @@ -49,13 +50,6 @@ /** Constant indicating baud rate of 230400 */ public static final int B230400 = 0010003; - /** Flag indicating that 'connect', 'accept', and 'UDPlisten' functions should produce blocking sockets rather than - * non-blocking. TODO: this use of a static flag is a remnant of an older design in which only the simulated versions - * were affected. Now that the native versions can be either blocking or non-blocking this behavior should be - * specified by call arguments and not by a flag. - */ - public static boolean isBlocking; - /** An arbitrary code used to indicating blocking in the simulated case */ private static final int SIMULATED_BLOCKING_CODE = -2; @@ -67,7 +61,7 @@ /** * Accept a connection on a listening socket (the operation is blocking but the resulting socket is - * non-blocking unless simulating and the isBlocking flag has been set) + * non-blocking) * @param socket the listening socket from which to accept * @return a new non-blocking socket representing a new connection or -errno on error */ @@ -77,15 +71,12 @@ try { Socket realSocket = ((ServerSocket) registry.get(socket)).accept(); realSocket.setTcpNoDelay(true); - if (!isBlocking) { - realSocket.setSoTimeout(1); - } return register(realSocket); } catch (IOException e) { return -1; } } else { - return doAccept(socket, isBlocking); + return doAccept(socket); } } @@ -114,8 +105,7 @@ } /** - * Connect a socket (the operation is blocking but the resulting socket is non-blocking unless simulating and isBlocking has - * been set) + * Connect a socket (the operation is blocking but the resulting socket is non-blocking) * @param host the host to connect * @param port the port to connect to (in host byte order) * @param udp true if a UDP socket is desired, otherwise a TCP socket @@ -132,13 +122,10 @@ } else { Socket socket = new Socket(host, port); socket.setTcpNoDelay(true); - if (!isBlocking) { - socket.setSoTimeout(1); - } return register(socket); } } else { - return doConnect(getHostAddr(host), port, udp, isBlocking); + return doConnect(getHostAddr(host), port, udp); } } @@ -155,6 +142,13 @@ } /** + * Query whether the NativeIO is simulated (if false, the RT native support is being used) + */ + public static boolean isSimulated() { + return simulated; + } + + /** * Create a listening TCP socket (for now, the result is blocking) * @param port the port to listen on * @return a new listening socket or -errno on error @@ -173,7 +167,7 @@ } /** - * Perform non-blocking read on an fd + * Perform a non-blocking read on an fd * @param fd the fd to read * @param buffer the buffer into which to read * @param offset the starting offset at which bytes should be deposited @@ -182,16 +176,33 @@ * or 0 if the connection is closed or a negative number on exceptional condition. The negative number is * the negation of the Unix errno value (e.g. -EWOULDBLOCK indicates data temporarily unavailable) */ - public static int read(int fd, byte[] buffer, int offset, int length) + public static int read(int fd, byte[] buffer, int offset, int length) { + return read(fd, buffer, offset, length, false); + } + + /** + * Perform read on an fd + * @param fd the fd to read + * @param buffer the buffer into which to read + * @param offset the starting offset at which bytes should be deposited + * @param length the number of bytes available at offset + * @param wait if true, wait until there is something to read (blocking behavior), else the usual non-blocking behavior + * @return a positive number of bytes read (which may harmlessly be less than the number requested), + * or 0 if the connection is closed or a negative number on exceptional condition. The negative number is + * the negation of the Unix errno value (e.g. -EWOULDBLOCK indicates data temporarily unavailable) + */ + public static int read(int fd, byte[] buffer, int offset, int length, boolean wait) { if (simulated) { /* Not really nonblocking, except that UDP reads use a 1ms timeout */ Object socket = registry.get(fd); try { if (socket instanceof Socket) { + ((Socket) socket).setSoTimeout(wait ? 0 : 1); InputStream in = ((Socket) socket).getInputStream(); return in.read(buffer, offset, length); } else if (socket instanceof DatagramSocket) { + ((DatagramSocket) socket).setSoTimeout(wait ? 0 : 1); DatagramPacket packet = new DatagramPacket(buffer, offset, length); ((DatagramSocket) socket).receive(packet); return packet.getLength(); @@ -204,7 +215,7 @@ return -1; } } else { - return doRead(fd, buffer, offset, length); + return doRead(fd, buffer, offset, length, wait); } } @@ -234,20 +245,17 @@ if (simulated) { try { DatagramSocket socket = new DatagramSocket(port); - if (!isBlocking) { - socket.setSoTimeout(1); - } return register(socket); } catch (SocketException e) { return -1; } } else { - return doUDPlisten(port, isBlocking); + return doUDPlisten(port); } } /** - * Perform non-blocking write to an fd + * Perform a non-blocking write to an fd * @param fd the fd to write * @param buffer the buffer from which to write * @param offset the starting offset from which bytes should be fetched @@ -256,7 +264,23 @@ * or 0 if the connection is closed or a negative number on exceptional condition. The negative number is * the negation of the Unix errno value (e.g. -EWOULDBLOCK indicates data temporarily unavailable) */ - public static int write(int fd, byte[] buffer, int offset, int length) + public static int write(int fd, byte[] buffer, int offset, int length) { + return write(fd, buffer, offset, length, false); + } + + /** + * Perform a write to an fd + * @param fd the fd to write + * @param buffer the buffer from which to write + * @param offset the starting offset from which bytes should be fetched + * @param length the number of bytes available at offset + * @param wait if true, waits for the fd to be receptive before writing (blocking behavior) else the usual non-blocking + * behavior. This has no effect when simulating. + * @return a positive number of bytes written (which may harmlessly be less than the number requested), + * or 0 if the connection is closed or a negative number on exceptional condition. The negative number is + * the negation of the Unix errno value (e.g. -EWOULDBLOCK indicates data temporarily unavailable) + */ + public static int write(int fd, byte[] buffer, int offset, int length, boolean wait) { if (simulated) { /* Not really nonblocking */ @@ -280,18 +304,18 @@ return -1; } } else { - return doWrite(fd, buffer, offset, length); + return doWrite(fd, buffer, offset, length, wait); } } /** Native for accept */ - private static native int doAccept(int socket, boolean isBlocking); + private static native int doAccept(int socket); /** Native for close */ private static native void doClose(int fd); /** Native for connect */ - private static native int doConnect(int host, int port, boolean udp, boolean isBlocking); + private static native int doConnect(int host, int port, boolean udp); /** Native for getBlockingCode */ private static native int doGetBlockingCode(); @@ -300,16 +324,16 @@ private static native int doListen(int port); /** Native for read */ - private static native int doRead(int fd, byte[] buffer, int offset, int length); + private static native int doRead(int fd, byte[] buffer, int offset, int length, boolean wait); /** Native for rs232 */ private static native int doRs232(String dev, int baudrate); /** Native for UDPlisten */ - private static native int doUDPlisten(int port, boolean isBlocking); + private static native int doUDPlisten(int port); /** Native for write */ - private static native int doWrite(int fd, byte[] buffer, int offset, int length); + private static native int doWrite(int fd, byte[] buffer, int offset, int length, boolean wait); /** * Convenience method for converting a host string to an IPv4 host byte order address @@ -328,7 +352,7 @@ return addr; } - /** + /** * Register a socket, getting back an integer index * @param socket a Socket or ServerSocket object * @return an index that can be used to recover it Added: trunk/flexotask/src/com/ibm/realtime/flexotask/util/NonBlockingBuffer.java =================================================================== --- trunk/flexotask/src/com/ibm/realtime/flexotask/util/NonBlockingBuffer.java (rev 0) +++ trunk/flexotask/src/com/ibm/realtime/flexotask/util/NonBlockingBuffer.java 2009-02-05 16:16:39 UTC (rev 125) @@ -0,0 +1,124 @@ +/* + * This file is part of Flexible Task Graphs + * (http://sourceforge.net/projects/flexotasks) + * + * Copyright (c) 2006 - 2008 IBM Corporation. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM Corporation - initial API and implementation + */ +package com.ibm.realtime.flexotask.util; + + +/** + * A class for use in conjunction with NativeIO for convenient management of non-blocking I/O in a context where a minimal + * amount of data is required for action. + */ +public class NonBlockingBuffer { + private static final int blocking = NativeIO.getBlockingCode(); + + /** The offset of the first unused byte in buffer if it is used for reading and the first unsent byte if it is used + * for writing */ + private int offset; + + /** The data buffer */ + private byte[] buffer; + + /** + * Make a new NonBlockingBuffer for reading + * @param size the size of the buffer + */ + public NonBlockingBuffer(int size) { + buffer = new byte[size]; + } + + /** + * Make a new NonBlockingBuffer for writing + * @param message the message to write + */ + public NonBlockingBuffer(byte[] message) { + buffer = message; + } + + /** + * Reset the buffer to the empty state + */ + public void reset() { + offset = 0; + } + + /** + * Reset the buffer with a new message to write + * @param message the new message to write + */ + public void reset(byte[] message) { + buffer = message; + offset = 0; + } + + /** + * Retrieve the capacity of the buffer + */ + public int size() { + return buffer.length; + } + + /** + * Retrieve the current "occupancy" (bytes read or already written) + */ + public int occupancy() { + return offset; + } + + /** + * Perform a read cycle on a file descriptor and return the occupancy + * @param fd the file descriptor + * @return the occupancy or a negative error indicator (only hard errors, never the blocking code) + */ + public int read(int fd) { + int rc = NativeIO.read(fd, buffer, offset, buffer.length-offset, false); + if (rc < 0 && rc != blocking) { + return rc; + } + offset += (rc == blocking ? 0 : rc); + return offset; + } + + /** + * Perform a write cycle on a file descriptor and return the occupancy + * @param fd the file descriptor + * @return the occupancy or a negative error indicator (only hard errors, never the blocking code) + */ + public int write(int fd) { + int rc = NativeIO.write(fd, buffer, offset, buffer.length-offset, false); + if (rc < 0 && rc != blocking) { + return rc; + } + offset += (rc == blocking ? 0 : rc); + return offset; + } + + /** + * Get the data from the buffer (call when you have enough data after one or more read cycles) + */ + public byte[] getData() { + byte[] ans = new byte[offset]; + System.arraycopy(buffer,0, ans,0, offset); + return ans; + } + + /** + * Convenience method allows a NonBlockingBuffer to be used for reading without using NativeIO + * @param val a byte to add + * @return the occupancy after adding + * @throws ArrayIndexOutOfBoundsException if buffer capacity exceeded + */ + public int addByte(byte val) { + buffer[offset++] = val; + return offset; + } +} Property changes on: trunk/flexotask/src/com/ibm/realtime/flexotask/util/NonBlockingBuffer.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:eol-style + native Modified: trunk/flexotask-functiontest/src/alltests/Testcases.java =================================================================== --- trunk/flexotask-functiontest/src/alltests/Testcases.java 2009-02-02 20:49:06 UTC (rev 124) +++ trunk/flexotask-functiontest/src/alltests/Testcases.java 2009-02-05 16:16:39 UTC (rev 125) @@ -69,6 +69,12 @@ types.Main.main(new String[0]); } + @Test + public void testNativeIO() throws IOException + { + nativeio.Main.main(new String[0]); + } + /** * Method to test in a non-eclipse environment (e.g. a realtime VM) */ Added: trunk/flexotask-functiontest/src/nativeio/Main.java =================================================================== --- trunk/flexotask-functiontest/src/nativeio/Main.java (rev 0) +++ trunk/flexotask-functiontest/src/nativeio/Main.java 2009-02-05 16:16:39 UTC (rev 125) @@ -0,0 +1,181 @@ +/* + * This file is part of Flexible Task Graphs + * (http://sourceforge.net/projects/flexotasks) + * + * Copyright (c) 2006 - 2008 IBM Corporation. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * IBM Corporation - initial API and implementation + */ +package nativeio; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; + +import com.ibm.realtime.flexotask.scheduling.FlexotaskFileDescriptorState; +import com.ibm.realtime.flexotask.scheduling.FlexotaskWaitSet; +import com.ibm.realtime.flexotask.util.NativeIO; +import com.ibm.realtime.flexotask.util.NonBlockingBuffer; + +/** + * A unit test of the NativeIO, FlexotaskWaitSet and NonBlockingBuffer utilities + */ +public class Main +{ + /* Record any error in secondary thread */ + static Throwable badSender = null; + + /* Main setup */ + public static void main(String[] args) throws IOException { + /* Check environment ... no point in running when only simulating */ + if (NativeIO.isSimulated()) { + System.err.println("NativeIO test not run since native support not present"); + return; + } + + /* Create exception handler for secondary thread */ + UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { + public void uncaughtException(Thread t, Throwable e) { + badSender = e; + } + }; + /* Make secondary thread to use as sender */ + Thread sender = new Thread(new Runnable() { + public void run() { senderRun(); } + }); + sender.setUncaughtExceptionHandler(handler); + sender.setDaemon(true); /* incase receiver aborts */ + /* Run sender, which will wait for receiver to connect */ + sender.start(); + /* Run receiver */ + receiverRun(); + /* Clean up sender */ + try { sender.join(); } catch (InterruptedException e) {} + /* Handle any sender exception */ + if (badSender != null) { + if (badSender instanceof RuntimeException) { + throw (RuntimeException) badSender; + } + if (badSender instanceof Error) { + throw (Error) badSender; + } + throw new IllegalStateException(badSender); + } + } + + /* The receiver loop */ + static void receiverRun() throws IOException { + /* Wait 1 second for sender to fire up */ + try { Thread.sleep(1000); } catch (InterruptedException e) {} + /* Connect to sender */ + int fd = NativeIO.connect("localhost", 9879, false); + if (fd < 0) { + throw new IllegalStateException("Connect rc=" + fd); + } + /* Do a few blocking reads */ + readAndCheck(fd, true, 0); + readAndCheck(fd, true, 1); + readAndCheck(fd, true, 2); + /* Now do some non-blocking ones */ + readAndCheck(fd, false, 3); + readAndCheck(fd, false, 4); + readAndCheck(fd, false, 5); + /* Now do some non-blocking ones with a WaitSet */ + FlexotaskFileDescriptorState state = new FlexotaskFileDescriptorState(fd, FlexotaskFileDescriptorState.READ); + FlexotaskWaitSet waitSet = new FlexotaskWaitSet(new FlexotaskFileDescriptorState[] {state}); + readWithWaitSet(fd, waitSet, 6); + readWithWaitSet(fd, waitSet, 7); + readWithWaitSet(fd, waitSet, 8); + /* Now read remaining contents non-blockingly using NonBlockingBuffer */ + NonBlockingBuffer toRead = new NonBlockingBuffer(102); + int rc = toRead.read(fd); + while (rc >= 0 && rc < 102) { + try { Thread.sleep(200); } catch (InterruptedException e) {} + System.err.print(rc + " "); + rc = toRead.read(fd); + } + if (rc < 0) { + throw new IllegalStateException("NonBlockingBuffer.read() rc=" + fd); + } + } + + /* The sender loop */ + static void senderRun() { + /* Accept a connection from the receiver */ + int listener = NativeIO.listen(9879); + if (listener < 0) { + throw new IllegalStateException("Listen rc=" + listener); + } + int fd = NativeIO.accept(listener); + if (fd < 0) { + throw new IllegalStateException("Accept rc=" + fd); + } + /* Uses NativeIO to send two bytes at a time at 500ms intervals for 30 seconds, using + * '00' through '59' + */ + byte[] buffer = new byte[2]; + for (int i = 0; i < 60; i++) { + buffer[0] = (byte) (i>>8); + buffer[1] = (byte) i; + int rc = NativeIO.write(fd, buffer, 0, 2, true); + if (rc != 2) { + throw new IllegalStateException("Write rc=" + rc); + } + try { Thread.sleep(500); } catch (InterruptedException e) {} + } + } + + /** + * Read a file descriptor and check the value + * @param fd the fd to read + * @param blocking true if a blocking read should be used from the start, false if a non-blocking read should + * be done first before blocking + * @param check the value to check + */ + private static void readAndCheck(int fd, boolean blocking, int check) { + byte[] buffer = new byte[2]; + int rc = NativeIO.read(fd, buffer, 0, 2, blocking); + if (blocking) { + postCheck(check, buffer, rc); + } else if (rc != NativeIO.getBlockingCode()) { + throw new IllegalStateException("Non-blocking read " + check + " should have returned " + NativeIO.getBlockingCode() + + " but returned " + rc); + } else { + readAndCheck(fd, true, check); + } + } + + /** + * Subroutine to check a buffer against a value and a return code + * @param check the value that should be there + * @param buffer the buffer + * @param rc the return code + */ + private static void postCheck(int check, byte[] buffer, int rc) { + if (rc != 2) { + /* Assumes it is impossible for a two-byte write to break up ... is that true? */ + throw new IllegalStateException("Read " + check + " rc=" + rc); + } + int compare = (buffer[0] << 8) | buffer[1]; + if (compare != check) { + throw new IllegalStateException("Read got " + compare + ", should be " + check); + } + } + + /** + * Read a file descriptor non-blockingly using a WaitSet to wait for it and then check the value + * @param fd the fd to read + * @param waitSet the WaitSet to use + * @param check the value to check + */ + private static void readWithWaitSet(int fd, FlexotaskWaitSet waitSet, int check) { + byte[] buffer = new byte[2]; + waitSet.wait(-1, null); + int rc = NativeIO.read(fd, buffer, 0, 2, false); + postCheck(check, buffer, rc); + } +} Property changes on: trunk/flexotask-functiontest/src/nativeio/Main.java ___________________________________________________________________ Added: svn:mime-type + text/plain Added: svn:eol-style + native This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |