[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core Channel.java,1.20,1.21
Status: Beta
Brought to you by:
huston
From: Huston F. <hu...@us...> - 2002-03-30 16:19:45
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core In directory usw-pr-cvs1:/tmp/cvs-serv16992/src/org/beepcore/beep/core Modified Files: Channel.java Log Message: start of new listener threading Index: Channel.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v retrieving revision 1.20 retrieving revision 1.21 diff -C2 -r1.20 -r1.21 *** Channel.java 15 Dec 2001 00:07:19 -0000 1.20 --- Channel.java 30 Mar 2002 16:19:42 -0000 1.21 *************** *** 23,26 **** --- 23,28 ---- import java.util.*; + import edu.oswego.cs.dl.util.concurrent.PooledExecutor; + import org.beepcore.beep.util.BufferSegment; import org.beepcore.beep.util.Log; *************** *** 118,121 **** --- 120,125 ---- private Object applicationData = null; + private static final PooledExecutor callbackQueue = new PooledExecutor(); + // in shutting down the session // something for waiting synchronous messages (semaphores or something) *************** *** 412,415 **** --- 416,420 ---- if (frame.getMessageType() == Message.MESSAGE_TYPE_MSG) { MessageMSG m = null; + boolean notify = false; synchronized (recvMSGQueue) { *************** *** 427,448 **** recvMSGQueue.addLast(m); } - } ! Iterator i = frame.getPayload(); ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment)i.next()); ! } ! if (frame.isLast()) { ! m.getDataStream().setComplete(); ! } ! // The MessageListener interface only allows one message ! // up to be processed at a time so if this is not the ! // first message on the queue just return. ! // Question, so how do we EVER catch up? If something ! // gets stuck here. I suspect it isn't getting taken off. ! synchronized (recvMSGQueue) { if (m != recvMSGQueue.getFirst()) { return; --- 432,452 ---- recvMSGQueue.addLast(m); + notify = true; } ! Iterator i = frame.getPayload(); ! synchronized (m) { ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment)i.next()); ! } ! if (frame.isLast()) { ! m.getDataStream().setComplete(); ! } ! } ! // The MessageListener interface only allows one message ! // up to be processed at a time so if this is not the ! // first message on the queue just return. if (m != recvMSGQueue.getFirst()) { return; *************** *** 450,473 **** if (frame.isLast()) { ! recvMSGQueue.remove(m); } } ! // notify message listener if this message has not been ! // notified before and notifyOnFirstFrame is set, the ! // window is full, this is the last frame. ! synchronized (m) { ! if (m.isNotified() ! || ((this.notifyOnFirstFrame == false) ! && (recvSequence - prevAckno) != ! (recvWindowSize - prevWindowUsed) ! && (frame.isLast() == false))) { ! return; } - - m.setNotified(); } - - ((MessageListener) this.listener).receiveMSG(m); return; --- 454,501 ---- if (frame.isLast()) { ! synchronized (m) { ! if (m.isNotified()) { ! recvMSGQueue.remove(m); ! } ! } } } ! if (notify) { ! try { ! callbackQueue.execute(new Runnable() { ! public void run() { ! MessageMSG m; ! synchronized (recvMSGQueue) { ! m = (MessageMSG)recvMSGQueue.getFirst(); ! synchronized (m) { ! if (m.getDataStream().isComplete()) { ! recvMSGQueue.remove(m); ! } ! m.setNotified(); ! } ! } ! ! try { ! listener.receiveMSG(m); ! } catch (BEEPError e) { ! try { ! m.sendERR(e); ! } catch (BEEPException e2) { ! Log.logEntry(Log.SEV_ERROR, e2); ! } ! } catch (AbortChannelException e) { ! try { ! Channel.this.close(); ! } catch (BEEPException e2) { ! Log.logEntry(Log.SEV_ERROR, e2); ! } ! } ! } ! }); ! } catch (InterruptedException e) { ! throw new BEEPException(e); } } return; |