[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core ThreadedMessageListener.java,N
Status: Beta
Brought to you by:
huston
From: Huston F. <hu...@us...> - 2002-08-20 03:09:00
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core In directory usw-pr-cvs1:/tmp/cvs-serv19034 Modified Files: Channel.java Session.java Added Files: ThreadedMessageListener.java Log Message: State cleanup and tuning reset fix --- NEW FILE: ThreadedMessageListener.java --- /* * ThreadedMessageListener.java $Revision: 1.1 $ $Date: 2002/08/20 03:08:58 $ * * Copyright (c) 2002 Huston Franklin. All rights reserved. * */ package org.beepcore.beep.core; import edu.oswego.cs.dl.util.concurrent.PooledExecutor; import org.beepcore.beep.util.Log; class ThreadedMessageListener implements MessageListener, Runnable { ThreadedMessageListener(Channel channel, MessageListener listener) { this.channel = channel; this.listener = listener; } public void receiveMSG(Message message) throws BEEPError, AbortChannelException { this.message = (MessageMSG)message; try { callbackQueue.execute(this); } catch (InterruptedException e) { throw new BEEPError(BEEPError.CODE_REQUESTED_ACTION_ABORTED); } } public void run() { try { listener.receiveMSG(message); } catch (BEEPError e) { try { message.sendERR(e); } catch (BEEPException e2) { Log.logEntry(Log.SEV_ERROR, e2); } } catch (AbortChannelException e) { try { channel.close(); } catch (BEEPException e2) { Log.logEntry(Log.SEV_ERROR, e2); } } } public MessageListener getMessageListener() { return listener; } private Channel channel; private MessageListener listener; private MessageMSG message; private static final PooledExecutor callbackQueue = new PooledExecutor(); } Index: Channel.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v retrieving revision 1.24 retrieving revision 1.25 diff -C2 -r1.24 -r1.25 *** Channel.java 12 May 2002 00:34:07 -0000 1.24 --- Channel.java 20 Aug 2002 03:08:58 -0000 1.25 *************** *** 3,7 **** * * Copyright (c) 2001 Invisible Worlds, Inc. All rights reserved. ! * Copyright (c) Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the --- 3,7 ---- * * Copyright (c) 2001 Invisible Worlds, Inc. All rights reserved. ! * Copyright (c) 2001,2002 Huston Franklin. All rights reserved. * * The contents of this file are subject to the Blocks Public License (the *************** *** 23,28 **** import java.util.*; - import edu.oswego.cs.dl.util.concurrent.PooledExecutor; - import org.beepcore.beep.util.BufferSegment; import org.beepcore.beep.util.Log; --- 23,26 ---- *************** *** 43,50 **** // class variables ! static final int STATE_UNINITIALISED = 1; ! static final int STATE_OK = 2; ! static final int STATE_CLOSING = 3; ! static final int STATE_CLOSED = 4; private static final BufferSegment zeroLengthSegment = new BufferSegment(new byte[0]); --- 41,54 ---- // class variables ! public static final int STATE_INITIALIZED = 0; ! public static final int STATE_STARTING = 1; ! public static final int STATE_ACTIVE = 2; ! public static final int STATE_TUNING_PENDING = 3; ! public static final int STATE_TUNING = 4; ! public static final int STATE_CLOSE_PENDING = 5; ! public static final int STATE_CLOSING = 6; ! public static final int STATE_CLOSED = 7; ! public static final int STATE_ABORTED = 8; ! private static final BufferSegment zeroLengthSegment = new BufferSegment(new byte[0]); *************** *** 96,100 **** private LinkedList recvReplyQueue; ! private int state = STATE_UNINITIALISED; private Frame previousFrame; --- 100,104 ---- private LinkedList recvReplyQueue; ! private int state = STATE_INITIALIZED; private Frame previousFrame; *************** *** 117,121 **** private Object applicationData = null; ! private static final PooledExecutor callbackQueue = new PooledExecutor(); // in shutting down the session --- 121,125 ---- private Object applicationData = null; ! private boolean blockingMessageListener = false; // in shutting down the session *************** *** 136,145 **** */ protected Channel(String profile, String number, MessageListener listener, ! Session session) { this.profile = profile; this.encoding = Constants.ENCODING_DEFAULT; this.number = number; ! this.listener = listener; this.session = session; sentSequence = 0; --- 140,149 ---- */ protected Channel(String profile, String number, MessageListener listener, ! boolean blocking, Session session) { this.profile = profile; this.encoding = Constants.ENCODING_DEFAULT; this.number = number; ! this.setMessageListener(listener, blocking); this.session = session; sentSequence = 0; *************** *** 151,155 **** recvMSGQueue = new LinkedList(); recvReplyQueue = new LinkedList(); ! state = STATE_UNINITIALISED; recvWindowUsed = 0; recvWindowSize = DEFAULT_WINDOW_SIZE; --- 155,159 ---- recvMSGQueue = new LinkedList(); recvReplyQueue = new LinkedList(); ! state = STATE_INITIALIZED; recvWindowUsed = 0; recvWindowSize = DEFAULT_WINDOW_SIZE; *************** *** 159,162 **** --- 163,171 ---- } + protected Channel(String profile, String number, Session session) + { + this(profile, number, null, false, session); + } + /** * This is a special constructor for Channel Zero *************** *** 168,172 **** Channel(Session session, String number, ReplyListener rl) { ! this(null, number, null, session); // Add a MSG to the SentMSGQueue to fake channel into accepting the --- 177,181 ---- Channel(Session session, String number, ReplyListener rl) { ! this(null, number, null, false, session); // Add a MSG to the SentMSGQueue to fake channel into accepting the *************** *** 175,179 **** null, rl)); ! state = STATE_OK; } --- 184,188 ---- null, rl)); ! state = STATE_ACTIVE; } *************** *** 266,270 **** { synchronized (this) { ! if ((state != STATE_OK) && (state != STATE_UNINITIALISED)) { throw new BEEPException("Channel in a bad state."); } --- 275,279 ---- { synchronized (this) { ! if ((state != STATE_ACTIVE) && (state != STATE_INITIALIZED)) { throw new BEEPException("Channel in a bad state."); } *************** *** 306,311 **** public MessageListener setMessageListener(MessageListener ml) { ! MessageListener tmp = this.listener; ! this.listener = ml; return tmp; } --- 315,339 ---- public MessageListener setMessageListener(MessageListener ml) { ! return setMessageListener(ml, true); ! } ! ! MessageListener setMessageListener(MessageListener ml, ! boolean blocking) ! { ! MessageListener tmp = getMessageListener(); ! ! if (ml == null) { ! this.listener = null; ! this.blockingMessageListener = false; ! return tmp; ! } ! ! if (blocking) { ! this.listener = new ThreadedMessageListener(this, ml); ! } else { ! this.listener = ml; ! } ! ! this.blockingMessageListener = blocking; return tmp; } *************** *** 316,320 **** public MessageListener getMessageListener() { ! return this.listener; } --- 344,353 ---- public MessageListener getMessageListener() { ! if (this.blockingMessageListener) { ! return ! ((ThreadedMessageListener)this.listener).getMessageListener(); ! } else { ! return this.listener; ! } } *************** *** 348,354 **** MessageStatus status; ! if (state != STATE_OK) { switch (state) { ! case STATE_UNINITIALISED : throw new BEEPException("Channel is uninitialised."); default : --- 381,387 ---- MessageStatus status; ! if (state != STATE_ACTIVE && state != STATE_TUNING) { switch (state) { ! case STATE_INITIALIZED : throw new BEEPException("Channel is uninitialised."); default : *************** *** 393,399 **** * returns the state of the <code>Channel</code> * The possible states are (all defined as Channel.STATE_*): - * STATE_UNINITIALISED - after a channel is created - * STATE_OK - a channel is acknowledged by the other session - * STATE_CLOSED - the channel has been closed */ int getState() --- 426,429 ---- *************** *** 456,492 **** if (notify) { ! try { ! callbackQueue.execute(new Runnable() { ! public void run() { ! MessageMSG m; ! synchronized (recvMSGQueue) { ! m = (MessageMSG)recvMSGQueue.getFirst(); ! synchronized (m) { ! if (m.getDataStream().isComplete()) { ! recvMSGQueue.remove(m); ! } ! m.setNotified(); ! } ! } ! ! try { ! listener.receiveMSG(m); ! } catch (BEEPError e) { ! try { ! m.sendERR(e); ! } catch (BEEPException e2) { ! Log.logEntry(Log.SEV_ERROR, e2); ! } ! } catch (AbortChannelException e) { ! try { ! Channel.this.close(); ! } catch (BEEPException e2) { ! Log.logEntry(Log.SEV_ERROR, e2); ! } ! } ! } ! }); ! } catch (InterruptedException e) { ! throw new BEEPException(e); } } --- 486,514 ---- if (notify) { ! synchronized (recvMSGQueue) { ! final MessageMSG m = ! (MessageMSG)recvMSGQueue.getFirst(); ! synchronized (m) { ! if (m.getDataStream().isComplete()) { ! recvMSGQueue.remove(m); ! } ! m.setNotified(); ! } ! ! try { ! listener.receiveMSG(m); ! } catch (BEEPError e) { ! try { ! m.sendERR(e); ! } catch (BEEPException e2) { ! Log.logEntry(Log.SEV_ERROR, e2); ! } ! } catch (AbortChannelException e) { ! try { ! Channel.this.close(); ! } catch (BEEPException e2) { ! Log.logEntry(Log.SEV_ERROR, e2); ! } ! } } } *************** *** 532,535 **** --- 554,561 ---- } + if (frame.isLast() && getState() == STATE_TUNING) { + this.session.disableIO(); + } + if (frame.getMessageType() == Message.MESSAGE_TYPE_NUL) { synchronized (recvReplyQueue) { *************** *** 680,684 **** int msgno = frame.getMsgno(); ! if (state != STATE_OK) { throw new BEEPException("State is " + state); } --- 706,710 ---- int msgno = frame.getMsgno(); ! if (state != STATE_ACTIVE && state != STATE_TUNING) { throw new BEEPException("State is " + state); } *************** *** 801,807 **** void sendMessage(MessageStatus m) throws BEEPException { ! if (state != STATE_OK) { switch (state) { ! case STATE_UNINITIALISED : throw new BEEPException("Channel is uninitialised."); default : --- 827,833 ---- void sendMessage(MessageStatus m) throws BEEPException { ! if (state != STATE_ACTIVE && state != STATE_TUNING) { switch (state) { ! case STATE_INITIALIZED : throw new BEEPException("Channel is uninitialised."); default : Index: Session.java =================================================================== RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Session.java,v retrieving revision 1.27 retrieving revision 1.28 diff -C2 -r1.27 -r1.28 *** Session.java 28 May 2002 04:50:06 -0000 1.27 --- Session.java 20 Aug 2002 03:08:58 -0000 1.28 *************** *** 177,181 **** zero = new Channel(this, CHANNEL_ZERO, greetingListener); ! zero.setMessageListener(new ChannelZeroListener()); channels.put(CHANNEL_ZERO, zero); --- 177,181 ---- zero = new Channel(this, CHANNEL_ZERO, greetingListener); ! zero.setMessageListener(new ChannelZeroListener(), false); channels.put(CHANNEL_ZERO, zero); *************** *** 228,232 **** zero = new Channel(this, CHANNEL_ZERO, greetingListener); ! zero.setMessageListener(new ChannelZeroListener()); channels.put(CHANNEL_ZERO, zero); --- 228,232 ---- zero = new Channel(this, CHANNEL_ZERO, greetingListener); ! zero.setMessageListener(new ChannelZeroListener(), false); channels.put(CHANNEL_ZERO, zero); *************** *** 525,529 **** */ Channel startChannelRequest(Collection profiles, MessageListener listener, ! boolean disableIO) throws BEEPException, BEEPError { --- 525,529 ---- */ Channel startChannelRequest(Collection profiles, MessageListener listener, ! boolean tuning) throws BEEPException, BEEPError { *************** *** 567,571 **** // @todo handle the data element // Create a channel ! Channel ch = new Channel(null, channelNumber, listener, this); // Make a message --- 567,571 ---- // @todo handle the data element // Create a channel ! Channel ch = new Channel(null, channelNumber, listener, true, this); // Make a message *************** *** 574,579 **** StringUtil.stringBufferToAscii(startBuffer)); // Tell Channel Zero to start us up ! StartReplyListener reply = new StartReplyListener(ch, disableIO); synchronized (reply) { this.zero.sendMSG(ds, reply); --- 574,585 ---- StringUtil.stringBufferToAscii(startBuffer)); + if (tuning) { + this.changeState(SESSION_STATE_TUNING_PENDING); + this.changeState(SESSION_STATE_TUNING); + this.zero.setState(Channel.STATE_TUNING); + } + // Tell Channel Zero to start us up ! StartReplyListener reply = new StartReplyListener(ch); synchronized (reply) { this.zero.sendMSG(ds, reply); *************** *** 592,600 **** } ! if (ch.getState() != Channel.STATE_OK) { throw new BEEPException("Error channel state (" + ch.getState() + ")"); } fireChannelStarted(ch); return ch; --- 598,610 ---- } ! if (ch.getState() != Channel.STATE_ACTIVE) { throw new BEEPException("Error channel state (" + ch.getState() + ")"); } + if (tuning) { + ch.setState(Channel.STATE_TUNING); + } + fireChannelStarted(ch); return ch; *************** *** 736,749 **** /** - * Method prevents Channel's window from being updated. - * - * - */ - protected void prohibitChannelWindowUpdates() - { - allowChannelWindowUpdates = false; - } - - /** * This method is used by a tuning profile to reset the session after the * tuning is complete. --- 746,749 ---- *************** *** 964,968 **** // Store the Channel ! ch.setState(Channel.STATE_OK); channels.put(ch.getNumberAsString(), ch); ((Message)zero.getAppData()).sendRPY(ds); --- 964,968 ---- // Store the Channel ! ch.setState(Channel.STATE_ACTIVE); channels.put(ch.getNumberAsString(), ch); ((Message)zero.getAppData()).sendRPY(ds); *************** *** 1221,1225 **** } ! ch = new Channel(p.uri, channelNumber, null, this); try { --- 1221,1225 ---- } ! ch = new Channel(p.uri, channelNumber, this); try { *************** *** 1532,1542 **** Channel channel; - boolean disableIO; BEEPError error; ! StartReplyListener(Channel channel, boolean disableIO) { this.channel = channel; - this.disableIO = disableIO; this.error = null; } --- 1532,1540 ---- Channel channel; BEEPError error; ! StartReplyListener(Channel channel) { this.channel = channel; this.error = null; } *************** *** 1553,1560 **** { try { - if (disableIO) { - Session.this.disableIO(); - } - Element topElement = processMessage(message); --- 1551,1554 ---- *************** *** 1600,1604 **** // set the state ! channel.setState(Channel.STATE_OK); channels.put(channel.getNumberAsString(), channel); --- 1594,1598 ---- // set the state ! channel.setState(Channel.STATE_ACTIVE); channels.put(channel.getNumberAsString(), channel); *************** *** 1612,1622 **** this.notify(); } - - // I'm not sure why this is being done. - if (TuningProfile.isTuningProfile(uri)) { - Log.logEntry(Log.SEV_DEBUG, CORE, - "Disabling this I/O thread"); - Session.this.disableIO(); - } } catch (Exception x) { throw new BEEPException(x.getMessage()); --- 1606,1609 ---- *************** *** 1742,1746 **** // set the state ! channel.setState(Channel.STATE_OK); channels.remove(channel.getNumberAsString()); --- 1729,1733 ---- // set the state ! channel.setState(Channel.STATE_ACTIVE); channels.remove(channel.getNumberAsString()); |