[Ubermq-commits] jms/src/com/ubermq/jms/common/ssl IONormalizer.java,NONE,1.1
Brought to you by:
jimmyp
From: <ji...@us...> - 2004-01-21 02:05:31
|
Update of /cvsroot/ubermq/jms/src/com/ubermq/jms/common/ssl In directory sc8-pr-cvs1:/tmp/cvs-serv28341/src/com/ubermq/jms/common/ssl Added Files: IONormalizer.java Log Message: bug fixes related to read/write initialization race conditions --- NEW FILE: IONormalizer.java --- package com.ubermq.jms.common.ssl; import java.net.*; import java.nio.channels.*; import java.nio.*; import java.io.*; /** * For now, it is necessary to map SSL operations onto NIO via pipes. * This is undesirable for a lot of reasons, but there is really not * much we can do here until Sun comes up with a way to do this * from the JDK.<P> * * This could also be useful perhaps to allow HTTP or other protocol * tunneling that is implemented in the JDK. <P> * * @since 2.1 */ public class IONormalizer { private static final int BUFFER_SIZE = 4096; /** * Normalizes I/O from java.io to/from java.nio paradigms. * This method will set up the necessary threads and resources * to read and write data using a pipe on one end and a socket * on the other. * * @param fromsocket a Pipe representing traffic * going to the socket * * @param tosocket a Pipe representing traffic * coming from the socket * * @param s a Socket, which is two-way * by definition. * * @throws IOException if a failure occurs */ public static void normalize(final Pipe fromsocket, final Pipe tosocket, final Socket s) throws IOException { // get streams final InputStream in = s.getInputStream(); final OutputStream out = s.getOutputStream(); // from the pipes to the socket. we need to be // able to interrupt this thread so that it will shutdown // the pipes when necessary. final Thread writer = new Thread(new Runnable() { public void run() { try { ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE); while(!Thread.interrupted()) { int n = tosocket.source().read(buf); if (n == -1) break; else { out.write(buf.array(), 0, n); buf.clear(); } } } catch (IOException e) { ; // do nothing } } }, "IONormalizer to-socket"); // now spawn a thread to handle bringing bytes // from the socket Thread reader = new Thread(new Runnable() { public void run() { try { byte[] buf = new byte[BUFFER_SIZE]; while(!Thread.interrupted()) { int n = in.read(buf); if (n == -1) { break; } else { int len = 0; while(len < n) { len += fromsocket.sink().write(ByteBuffer.wrap(buf, len, n-len)); } } } } catch (SocketException e) { // don;t print here. it's fine that the cxn was // reset. } catch (IOException e) { com.ubermq.util.Utility.getLogger().error("", e); } finally { try { writer.interrupt(); tosocket.sink().close(); writer.join(); } catch (Exception e) { com.ubermq.util.Utility.getLogger().error("", e); } // close everything try { fromsocket.sink().close(); } catch (IOException e) { com.ubermq.util.Utility.getLogger().error("", e); } try { s.close(); } catch (IOException e) { com.ubermq.util.Utility.getLogger().error("", e); } } } }, "IONormalizer from-socket"); // start em reader.setDaemon(true); writer.setDaemon(true); reader.start(); writer.start(); } } |