From: <jsa...@us...> - 2009-02-05 16:32:44
|
Revision: 126 http://flexotask.svn.sourceforge.net/flexotask/?rev=126&view=rev Author: jsauerbach Date: 2009-02-05 16:32:41 +0000 (Thu, 05 Feb 2009) Log Message: ----------- In NativeIO add the ability to dynamically vary between blocking and non-blocking operations on an individual socket. This commit adds the native code portion. Modified Paths: -------------- trunk/flexotask/native/NBIO.c Modified: trunk/flexotask/native/NBIO.c =================================================================== --- trunk/flexotask/native/NBIO.c 2009-02-05 16:16:39 UTC (rev 125) +++ trunk/flexotask/native/NBIO.c 2009-02-05 16:32:41 UTC (rev 126) @@ -2,7 +2,7 @@ * This file is part of Flexible Task Graphs * (http://sourceforge.net/projects/flexotasks) * - * Copyright (c) 2006 - 2008 IBM Corporation. + * Copyright (c) 2006 - 2009 IBM Corporation. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -19,6 +19,7 @@ #include <netinet/tcp.h> #include <fcntl.h> #include <termios.h> +#include <poll.h> /* * Class: com_ibm_realtime_flexotask_util_NativeIO @@ -31,7 +32,7 @@ { jint sock; struct sockaddr_in addr; - int rc; + int rc; if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { return -errno; } @@ -40,12 +41,12 @@ addr.sin_port = htons(port); addr.sin_addr.s_addr = 0; if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - rc = -errno; + rc = -errno; close(sock); return rc; } if (listen(sock, 2) < 0) { - rc = -errno; + rc = -errno; close(sock); return rc; } @@ -58,20 +59,20 @@ static int makeNonBlocking(int sock, jboolean tcp) { int flags, rc; - if (tcp) { - int on = 1; - if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(int))) { - rc = -errno; - close(sock); - return rc; - } - } + if (tcp) { + int on = 1; + if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(int))) { + rc = -errno; + close(sock); + return rc; + } + } flags = fcntl(sock, F_GETFL, 0); if (flags == -1) { flags = 0; } if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) { - rc = -errno; + rc = -errno; close(sock); return rc; } @@ -81,70 +82,62 @@ /* * Class: com_ibm_realtime_flexotask_util_NativeIO * Method: doUDPlisten - * Signature: (IZ)I + * Signature: (I)I */ JNIEXPORT jint JNICALL Java_com_ibm_realtime_flexotask_util_NativeIO_doUDPlisten -(JNIEnv *env, jclass ignore, jint port, jboolean isBlocking) +(JNIEnv *env, jclass ignore, jint port) { - int rc; + int rc; struct sockaddr_in addr; - jint sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + jint sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (sock < 0) { - return -errno; + return -errno; } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - rc = -errno; - close(sock); - return rc; - } - if (isBlocking) { - return sock; - } else { - return makeNonBlocking(sock, 0); - } + rc = -errno; + close(sock); + return rc; + } + return makeNonBlocking(sock, 0); } /* * Class: com_ibm_realtime_flexotask_util_NativeIO * Method: doAccept - * Signature: (IZ)I + * Signature: (I)I */ JNIEXPORT jint JNICALL Java_com_ibm_realtime_flexotask_util_NativeIO_doAccept -(JNIEnv * env, jclass ignore, jint socket, jboolean isBlocking) +(JNIEnv * env, jclass ignore, jint socket) { jint sock = accept(socket, NULL, 0); if (sock < 0) { return -errno; } - if (isBlocking) { - return sock; - } else { - return makeNonBlocking(sock, 1); - } + return makeNonBlocking(sock, 1); } /* * Class: com_ibm_realtime_flexotask_util_NativeIO * Method: doConnect - * Signature: (IIZZ)I + * Signature: (IIZ)I */ JNIEXPORT jint JNICALL Java_com_ibm_realtime_flexotask_util_NativeIO_doConnect -(JNIEnv * env, jclass ignore, jint host, jint port, jboolean udp, jboolean isBlocking) +(JNIEnv * env, jclass ignore, jint host, jint port, jboolean udp) { jint sock; struct sockaddr_in addr; - int rc; - if (udp) { - sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - } else { - sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); - } + int rc; + if (udp) { + sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + } else { + sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + } if (sock < 0) { return -errno; } @@ -153,15 +146,11 @@ addr.sin_port = htons(port); addr.sin_addr.s_addr = htonl(host); if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - rc = -errno; + rc = -errno; close(sock); return rc; } - if (isBlocking) { - return sock; - } else { - return makeNonBlocking(sock, !udp); - } + return makeNonBlocking(sock, !udp); } /* @@ -173,31 +162,31 @@ Java_com_ibm_realtime_flexotask_util_NativeIO_doRs232 (JNIEnv * env, jclass ignore, jstring dev, jint baudrate) { - const char* name; - jint fd; - struct termios term; - int rc; - name = (*env)->GetStringUTFChars(env, dev, 0); - fd = open(name, O_RDWR | O_NOCTTY); - (*env)->ReleaseStringUTFChars(env, dev, name); - if (fd < 0) { - return -errno; - } - memset(&term, 0, sizeof term); - term.c_cflag = baudrate | CS8 | CLOCAL | CREAD; - term.c_iflag = IGNPAR; - term.c_cc[VTIME] = 0; // temp - if (tcflush(fd, TCIFLUSH) < 0) { - rc = -errno; - close(fd); - return rc; - } - if (tcsetattr(fd, TCSANOW, &term) < 0) { - rc = -errno; - close(fd); - return rc; - } - return fd; + const char* name; + jint fd; + struct termios term; + int rc; + name = (*env)->GetStringUTFChars(env, dev, 0); + fd = open(name, O_RDWR | O_NOCTTY); + (*env)->ReleaseStringUTFChars(env, dev, name); + if (fd < 0) { + return -errno; + } + memset(&term, 0, sizeof term); + term.c_cflag = baudrate | CS8 | CLOCAL | CREAD; + term.c_iflag = IGNPAR; + term.c_cc[VTIME] = 0; // temp + if (tcflush(fd, TCIFLUSH) < 0) { + rc = -errno; + close(fd); + return rc; + } + if (tcsetattr(fd, TCSANOW, &term) < 0) { + rc = -errno; + close(fd); + return rc; + } + return fd; } /* @@ -213,40 +202,64 @@ } /* + * Wait for a state of file descriptor readiness + */ +static int +waitForFD(int fd, short state) +{ + struct pollfd polldata; + polldata.fd = fd; + polldata.events = state; + return poll(&polldata, 1, -1); +} + +/* * Class: com_ibm_realtime_flexotask_util_NativeIO * Method: doRead - * Signature: (I[BII)I + * Signature: (I[BIIZ)I */ JNIEXPORT jint JNICALL Java_com_ibm_realtime_flexotask_util_NativeIO_doRead -(JNIEnv * env, jclass ignore, jint fd, jbyteArray buffer, jint offset, jint length) +(JNIEnv * env, jclass ignore, jint fd, jbyteArray buffer, jint offset, jint length, jboolean wait) { - int rc; - jbyte* buf = (*env)->GetByteArrayElements(env, buffer, 0); - rc = read(fd, buf+offset, length); - (*env)->ReleaseByteArrayElements(env, buffer, buf, 0); - if (rc < 0) { - rc = -errno; + int rc; + if (wait) { + rc = waitForFD(fd, POLLIN); + if (rc < 0) { + return -errno; + } } - return rc; + jbyte* buf = (*env)->GetByteArrayElements(env, buffer, 0); + rc = read(fd, buf+offset, length); + (*env)->ReleaseByteArrayElements(env, buffer, buf, 0); + if (rc < 0) { + rc = -errno; + } + return rc; } /* * Class: com_ibm_realtime_flexotask_util_NativeIO * Method: doWrite - * Signature: (I[BII)I + * Signature: (I[BIIZ)I */ JNIEXPORT jint JNICALL Java_com_ibm_realtime_flexotask_util_NativeIO_doWrite -(JNIEnv * env, jclass ignore, jint fd, jbyteArray buffer, jint offset, jint length) +(JNIEnv * env, jclass ignore, jint fd, jbyteArray buffer, jint offset, jint length, jboolean wait) { - int rc; - jbyte* buf = (*env)->GetByteArrayElements(env, buffer, 0); - rc = write(fd, buf+offset, length); - (*env)->ReleaseByteArrayElements(env, buffer, buf, 0); - if (rc < 0) { - rc = -errno; + int rc; + if (wait) { + rc = waitForFD(fd, POLLOUT); + if (rc < 0) { + return -errno; + } } - return rc; + jbyte* buf = (*env)->GetByteArrayElements(env, buffer, 0); + rc = write(fd, buf+offset, length); + (*env)->ReleaseByteArrayElements(env, buffer, buf, 0); + if (rc < 0) { + rc = -errno; + } + return rc; } /* This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |