[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core Channel.java,1.30,1.31
Status: Beta
Brought to you by:
huston
|
From: Huston F. <hu...@us...> - 2003-03-07 13:02:00
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core
In directory sc8-pr-cvs1:/tmp/cvs-serv16166/src/org/beepcore/beep/core
Modified Files:
Channel.java
Log Message:
Fixed channel to only allow one MSG to be notified at a time.
Index: Channel.java
===================================================================
RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v
retrieving revision 1.30
retrieving revision 1.31
diff -C2 -r1.30 -r1.31
*** Channel.java 5 Oct 2002 15:26:30 -0000 1.30
--- Channel.java 7 Mar 2003 13:01:54 -0000 1.31
***************
*** 187,190 ****
--- 187,191 ----
sentMSGQueue.add(new MessageStatus(this, Message.MESSAGE_TYPE_MSG, 0,
null, rl));
+ recvMSGQueue.add(new MessageMSG(this, 0, null));
state = STATE_ACTIVE;
***************
*** 455,504 ****
}
! if (m == null) {
! m = new MessageMSG(this, frame.getMsgno(),
! new InputDataStream(this));
!
! recvMSGQueue.addLast(m);
! notify = true;
! }
! Iterator i = frame.getPayload();
! synchronized (m) {
! while (i.hasNext()) {
! m.getDataStream().add((BufferSegment)i.next());
! }
! if (frame.isLast()) {
! m.getDataStream().setComplete();
! }
! }
! // The MessageListener interface only allows one message
! // up to be processed at a time so if this is not the
! // first message on the queue just return.
! if (m != recvMSGQueue.getFirst()) {
! return;
}
if (frame.isLast()) {
! synchronized (m) {
! if (m.isNotified()) {
! recvMSGQueue.remove(m);
! }
! }
}
- }
-
- if (notify) {
- synchronized (recvMSGQueue) {
- final MessageMSG m =
- (MessageMSG)recvMSGQueue.getFirst();
- synchronized (m) {
- if (m.getDataStream().isComplete()) {
- recvMSGQueue.remove(m);
- }
- m.setNotified();
- }
try {
listener.receiveMSG(m);
--- 456,492 ----
}
! if (m != null) {
! /// Move this code to DataStream...
! Iterator i = frame.getPayload();
! synchronized (m) {
! while (i.hasNext()) {
! m.getDataStream().add((BufferSegment) i.next());
! }
!
! if (frame.isLast()) {
! m.getDataStream().setComplete();
! }
! }
!
! return;
! }
! m = new MessageMSG(this, frame.getMsgno(),
! new InputDataStream(this));
! m.setNotified();
! Iterator i = frame.getPayload();
! while (i.hasNext()) {
! m.getDataStream().add((BufferSegment)i.next());
}
if (frame.isLast()) {
! m.getDataStream().setComplete();
}
+ recvMSGQueue.addLast(m);
+
+ if (recvMSGQueue.size() == 1) {
try {
listener.receiveMSG(m);
***************
*** 951,954 ****
--- 939,980 ----
status.setMessageStatus(MessageStatus.MESSAGE_STATUS_SENT);
+
+ if (ds.isComplete() && ds.availableSegment() == false &&
+ (status.getMessageType() == Message.MESSAGE_TYPE_RPY ||
+ status.getMessageType() == Message.MESSAGE_TYPE_NUL))
+ {
+ MessageMSG m;
+ synchronized (recvMSGQueue) {
+ recvMSGQueue.removeFirst();
+
+ if (recvMSGQueue.size() != 0) {
+ m = (MessageMSG) recvMSGQueue.getFirst();
+ synchronized (m) {
+ m.setNotified();
+ }
+ } else {
+ m = null;
+ }
+ }
+
+ if (m != null) {
+ try {
+ listener.receiveMSG(m);
+ } catch (BEEPError e) {
+ try {
+ m.sendERR(e);
+ } catch (BEEPException e2) {
+ log.error("Error sending ERR", e2);
+ }
+ } catch (AbortChannelException e) {
+ try {
+ /* @todo change this to abort or something else */
+ Channel.this.close();
+ } catch (BEEPException e2) {
+ log.error("Error closing channel", e2);
+ }
+ }
+ }
+ }
}
|