[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core Channel.java,1.20,1.21
Status: Beta
Brought to you by:
huston
|
From: Huston F. <hu...@us...> - 2002-03-30 16:19:45
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core
In directory usw-pr-cvs1:/tmp/cvs-serv16992/src/org/beepcore/beep/core
Modified Files:
Channel.java
Log Message:
start of new listener threading
Index: Channel.java
===================================================================
RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -C2 -r1.20 -r1.21
*** Channel.java 15 Dec 2001 00:07:19 -0000 1.20
--- Channel.java 30 Mar 2002 16:19:42 -0000 1.21
***************
*** 23,26 ****
--- 23,28 ----
import java.util.*;
+ import edu.oswego.cs.dl.util.concurrent.PooledExecutor;
+
import org.beepcore.beep.util.BufferSegment;
import org.beepcore.beep.util.Log;
***************
*** 118,121 ****
--- 120,125 ----
private Object applicationData = null;
+ private static final PooledExecutor callbackQueue = new PooledExecutor();
+
// in shutting down the session
// something for waiting synchronous messages (semaphores or something)
***************
*** 412,415 ****
--- 416,420 ----
if (frame.getMessageType() == Message.MESSAGE_TYPE_MSG) {
MessageMSG m = null;
+ boolean notify = false;
synchronized (recvMSGQueue) {
***************
*** 427,448 ****
recvMSGQueue.addLast(m);
}
- }
! Iterator i = frame.getPayload();
! while (i.hasNext()) {
! m.getDataStream().add((BufferSegment)i.next());
! }
! if (frame.isLast()) {
! m.getDataStream().setComplete();
! }
! // The MessageListener interface only allows one message
! // up to be processed at a time so if this is not the
! // first message on the queue just return.
! // Question, so how do we EVER catch up? If something
! // gets stuck here. I suspect it isn't getting taken off.
! synchronized (recvMSGQueue) {
if (m != recvMSGQueue.getFirst()) {
return;
--- 432,452 ----
recvMSGQueue.addLast(m);
+ notify = true;
}
! Iterator i = frame.getPayload();
! synchronized (m) {
! while (i.hasNext()) {
! m.getDataStream().add((BufferSegment)i.next());
! }
! if (frame.isLast()) {
! m.getDataStream().setComplete();
! }
! }
! // The MessageListener interface only allows one message
! // up to be processed at a time so if this is not the
! // first message on the queue just return.
if (m != recvMSGQueue.getFirst()) {
return;
***************
*** 450,473 ****
if (frame.isLast()) {
! recvMSGQueue.remove(m);
}
}
! // notify message listener if this message has not been
! // notified before and notifyOnFirstFrame is set, the
! // window is full, this is the last frame.
! synchronized (m) {
! if (m.isNotified()
! || ((this.notifyOnFirstFrame == false)
! && (recvSequence - prevAckno) !=
! (recvWindowSize - prevWindowUsed)
! && (frame.isLast() == false))) {
! return;
}
-
- m.setNotified();
}
-
- ((MessageListener) this.listener).receiveMSG(m);
return;
--- 454,501 ----
if (frame.isLast()) {
! synchronized (m) {
! if (m.isNotified()) {
! recvMSGQueue.remove(m);
! }
! }
}
}
! if (notify) {
! try {
! callbackQueue.execute(new Runnable() {
! public void run() {
! MessageMSG m;
! synchronized (recvMSGQueue) {
! m = (MessageMSG)recvMSGQueue.getFirst();
! synchronized (m) {
! if (m.getDataStream().isComplete()) {
! recvMSGQueue.remove(m);
! }
! m.setNotified();
! }
! }
!
! try {
! listener.receiveMSG(m);
! } catch (BEEPError e) {
! try {
! m.sendERR(e);
! } catch (BEEPException e2) {
! Log.logEntry(Log.SEV_ERROR, e2);
! }
! } catch (AbortChannelException e) {
! try {
! Channel.this.close();
! } catch (BEEPException e2) {
! Log.logEntry(Log.SEV_ERROR, e2);
! }
! }
! }
! });
! } catch (InterruptedException e) {
! throw new BEEPException(e);
}
}
return;
|