[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core Channel.java,1.30,1.31
Status: Beta
Brought to you by:
huston
From: Huston F. <hu...@us...> - 2003-03-07 13:02:00
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core In directory sc8-pr-cvs1:/tmp/cvs-serv16166/src/org/beepcore/beep/core Modified Files: Channel.java Log Message: Fixed channel to only allow one MSG to be notified at a time. Index: Channel.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v retrieving revision 1.30 retrieving revision 1.31 diff -C2 -r1.30 -r1.31 *** Channel.java 5 Oct 2002 15:26:30 -0000 1.30 --- Channel.java 7 Mar 2003 13:01:54 -0000 1.31 *************** *** 187,190 **** --- 187,191 ---- sentMSGQueue.add(new MessageStatus(this, Message.MESSAGE_TYPE_MSG, 0, null, rl)); + recvMSGQueue.add(new MessageMSG(this, 0, null)); state = STATE_ACTIVE; *************** *** 455,504 **** } ! if (m == null) { ! m = new MessageMSG(this, frame.getMsgno(), ! new InputDataStream(this)); ! ! recvMSGQueue.addLast(m); ! notify = true; ! } ! Iterator i = frame.getPayload(); ! synchronized (m) { ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment)i.next()); ! } ! if (frame.isLast()) { ! m.getDataStream().setComplete(); ! } ! } ! // The MessageListener interface only allows one message ! // up to be processed at a time so if this is not the ! // first message on the queue just return. ! if (m != recvMSGQueue.getFirst()) { ! return; } if (frame.isLast()) { ! synchronized (m) { ! if (m.isNotified()) { ! recvMSGQueue.remove(m); ! } ! } } - } - - if (notify) { - synchronized (recvMSGQueue) { - final MessageMSG m = - (MessageMSG)recvMSGQueue.getFirst(); - synchronized (m) { - if (m.getDataStream().isComplete()) { - recvMSGQueue.remove(m); - } - m.setNotified(); - } try { listener.receiveMSG(m); --- 456,492 ---- } ! if (m != null) { ! /// Move this code to DataStream... ! Iterator i = frame.getPayload(); ! synchronized (m) { ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment) i.next()); ! } ! ! if (frame.isLast()) { ! m.getDataStream().setComplete(); ! } ! } ! ! return; ! } ! m = new MessageMSG(this, frame.getMsgno(), ! new InputDataStream(this)); ! m.setNotified(); ! Iterator i = frame.getPayload(); ! while (i.hasNext()) { ! m.getDataStream().add((BufferSegment)i.next()); } if (frame.isLast()) { ! m.getDataStream().setComplete(); } + recvMSGQueue.addLast(m); + + if (recvMSGQueue.size() == 1) { try { listener.receiveMSG(m); *************** *** 951,954 **** --- 939,980 ---- status.setMessageStatus(MessageStatus.MESSAGE_STATUS_SENT); + + if (ds.isComplete() && ds.availableSegment() == false && + (status.getMessageType() == Message.MESSAGE_TYPE_RPY || + status.getMessageType() == Message.MESSAGE_TYPE_NUL)) + { + MessageMSG m; + synchronized (recvMSGQueue) { + recvMSGQueue.removeFirst(); + + if (recvMSGQueue.size() != 0) { + m = (MessageMSG) recvMSGQueue.getFirst(); + synchronized (m) { + m.setNotified(); + } + } else { + m = null; + } + } + + if (m != null) { + try { + listener.receiveMSG(m); + } catch (BEEPError e) { + try { + m.sendERR(e); + } catch (BEEPException e2) { + log.error("Error sending ERR", e2); + } + } catch (AbortChannelException e) { + try { + /* @todo change this to abort or something else */ + Channel.this.close(); + } catch (BEEPException e2) { + log.error("Error closing channel", e2); + } + } + } + } } |