[Beepcore-java-commits] CVS: beepcore-java/src/org/beepcore/beep/core ThreadedMessageListener.java,N
Status: Beta
Brought to you by:
huston
|
From: Huston F. <hu...@us...> - 2002-08-20 03:09:00
|
Update of /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core
In directory usw-pr-cvs1:/tmp/cvs-serv19034
Modified Files:
Channel.java Session.java
Added Files:
ThreadedMessageListener.java
Log Message:
State cleanup and tuning reset fix
--- NEW FILE: ThreadedMessageListener.java ---
/*
* ThreadedMessageListener.java $Revision: 1.1 $ $Date: 2002/08/20 03:08:58 $
*
* Copyright (c) 2002 Huston Franklin. All rights reserved.
*
*/
package org.beepcore.beep.core;
import edu.oswego.cs.dl.util.concurrent.PooledExecutor;
import org.beepcore.beep.util.Log;
class ThreadedMessageListener implements MessageListener, Runnable {
ThreadedMessageListener(Channel channel, MessageListener listener) {
this.channel = channel;
this.listener = listener;
}
public void receiveMSG(Message message)
throws BEEPError, AbortChannelException
{
this.message = (MessageMSG)message;
try {
callbackQueue.execute(this);
} catch (InterruptedException e) {
throw new BEEPError(BEEPError.CODE_REQUESTED_ACTION_ABORTED);
}
}
public void run() {
try {
listener.receiveMSG(message);
} catch (BEEPError e) {
try {
message.sendERR(e);
} catch (BEEPException e2) {
Log.logEntry(Log.SEV_ERROR, e2);
}
} catch (AbortChannelException e) {
try {
channel.close();
} catch (BEEPException e2) {
Log.logEntry(Log.SEV_ERROR, e2);
}
}
}
public MessageListener getMessageListener()
{
return listener;
}
private Channel channel;
private MessageListener listener;
private MessageMSG message;
private static final PooledExecutor callbackQueue =
new PooledExecutor();
}
Index: Channel.java
===================================================================
RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Channel.java,v
retrieving revision 1.24
retrieving revision 1.25
diff -C2 -r1.24 -r1.25
*** Channel.java 12 May 2002 00:34:07 -0000 1.24
--- Channel.java 20 Aug 2002 03:08:58 -0000 1.25
***************
*** 3,7 ****
*
* Copyright (c) 2001 Invisible Worlds, Inc. All rights reserved.
! * Copyright (c) Huston Franklin. All rights reserved.
*
* The contents of this file are subject to the Blocks Public License (the
--- 3,7 ----
*
* Copyright (c) 2001 Invisible Worlds, Inc. All rights reserved.
! * Copyright (c) 2001,2002 Huston Franklin. All rights reserved.
*
* The contents of this file are subject to the Blocks Public License (the
***************
*** 23,28 ****
import java.util.*;
- import edu.oswego.cs.dl.util.concurrent.PooledExecutor;
-
import org.beepcore.beep.util.BufferSegment;
import org.beepcore.beep.util.Log;
--- 23,26 ----
***************
*** 43,50 ****
// class variables
! static final int STATE_UNINITIALISED = 1;
! static final int STATE_OK = 2;
! static final int STATE_CLOSING = 3;
! static final int STATE_CLOSED = 4;
private static final BufferSegment zeroLengthSegment =
new BufferSegment(new byte[0]);
--- 41,54 ----
// class variables
! public static final int STATE_INITIALIZED = 0;
! public static final int STATE_STARTING = 1;
! public static final int STATE_ACTIVE = 2;
! public static final int STATE_TUNING_PENDING = 3;
! public static final int STATE_TUNING = 4;
! public static final int STATE_CLOSE_PENDING = 5;
! public static final int STATE_CLOSING = 6;
! public static final int STATE_CLOSED = 7;
! public static final int STATE_ABORTED = 8;
!
private static final BufferSegment zeroLengthSegment =
new BufferSegment(new byte[0]);
***************
*** 96,100 ****
private LinkedList recvReplyQueue;
! private int state = STATE_UNINITIALISED;
private Frame previousFrame;
--- 100,104 ----
private LinkedList recvReplyQueue;
! private int state = STATE_INITIALIZED;
private Frame previousFrame;
***************
*** 117,121 ****
private Object applicationData = null;
! private static final PooledExecutor callbackQueue = new PooledExecutor();
// in shutting down the session
--- 121,125 ----
private Object applicationData = null;
! private boolean blockingMessageListener = false;
// in shutting down the session
***************
*** 136,145 ****
*/
protected Channel(String profile, String number, MessageListener listener,
! Session session)
{
this.profile = profile;
this.encoding = Constants.ENCODING_DEFAULT;
this.number = number;
! this.listener = listener;
this.session = session;
sentSequence = 0;
--- 140,149 ----
*/
protected Channel(String profile, String number, MessageListener listener,
! boolean blocking, Session session)
{
this.profile = profile;
this.encoding = Constants.ENCODING_DEFAULT;
this.number = number;
! this.setMessageListener(listener, blocking);
this.session = session;
sentSequence = 0;
***************
*** 151,155 ****
recvMSGQueue = new LinkedList();
recvReplyQueue = new LinkedList();
! state = STATE_UNINITIALISED;
recvWindowUsed = 0;
recvWindowSize = DEFAULT_WINDOW_SIZE;
--- 155,159 ----
recvMSGQueue = new LinkedList();
recvReplyQueue = new LinkedList();
! state = STATE_INITIALIZED;
recvWindowUsed = 0;
recvWindowSize = DEFAULT_WINDOW_SIZE;
***************
*** 159,162 ****
--- 163,171 ----
}
+ protected Channel(String profile, String number, Session session)
+ {
+ this(profile, number, null, false, session);
+ }
+
/**
* This is a special constructor for Channel Zero
***************
*** 168,172 ****
Channel(Session session, String number, ReplyListener rl)
{
! this(null, number, null, session);
// Add a MSG to the SentMSGQueue to fake channel into accepting the
--- 177,181 ----
Channel(Session session, String number, ReplyListener rl)
{
! this(null, number, null, false, session);
// Add a MSG to the SentMSGQueue to fake channel into accepting the
***************
*** 175,179 ****
null, rl));
! state = STATE_OK;
}
--- 184,188 ----
null, rl));
! state = STATE_ACTIVE;
}
***************
*** 266,270 ****
{
synchronized (this) {
! if ((state != STATE_OK) && (state != STATE_UNINITIALISED)) {
throw new BEEPException("Channel in a bad state.");
}
--- 275,279 ----
{
synchronized (this) {
! if ((state != STATE_ACTIVE) && (state != STATE_INITIALIZED)) {
throw new BEEPException("Channel in a bad state.");
}
***************
*** 306,311 ****
public MessageListener setMessageListener(MessageListener ml)
{
! MessageListener tmp = this.listener;
! this.listener = ml;
return tmp;
}
--- 315,339 ----
public MessageListener setMessageListener(MessageListener ml)
{
! return setMessageListener(ml, true);
! }
!
! MessageListener setMessageListener(MessageListener ml,
! boolean blocking)
! {
! MessageListener tmp = getMessageListener();
!
! if (ml == null) {
! this.listener = null;
! this.blockingMessageListener = false;
! return tmp;
! }
!
! if (blocking) {
! this.listener = new ThreadedMessageListener(this, ml);
! } else {
! this.listener = ml;
! }
!
! this.blockingMessageListener = blocking;
return tmp;
}
***************
*** 316,320 ****
public MessageListener getMessageListener()
{
! return this.listener;
}
--- 344,353 ----
public MessageListener getMessageListener()
{
! if (this.blockingMessageListener) {
! return
! ((ThreadedMessageListener)this.listener).getMessageListener();
! } else {
! return this.listener;
! }
}
***************
*** 348,354 ****
MessageStatus status;
! if (state != STATE_OK) {
switch (state) {
! case STATE_UNINITIALISED :
throw new BEEPException("Channel is uninitialised.");
default :
--- 381,387 ----
MessageStatus status;
! if (state != STATE_ACTIVE && state != STATE_TUNING) {
switch (state) {
! case STATE_INITIALIZED :
throw new BEEPException("Channel is uninitialised.");
default :
***************
*** 393,399 ****
* returns the state of the <code>Channel</code>
* The possible states are (all defined as Channel.STATE_*):
- * STATE_UNINITIALISED - after a channel is created
- * STATE_OK - a channel is acknowledged by the other session
- * STATE_CLOSED - the channel has been closed
*/
int getState()
--- 426,429 ----
***************
*** 456,492 ****
if (notify) {
! try {
! callbackQueue.execute(new Runnable() {
! public void run() {
! MessageMSG m;
! synchronized (recvMSGQueue) {
! m = (MessageMSG)recvMSGQueue.getFirst();
! synchronized (m) {
! if (m.getDataStream().isComplete()) {
! recvMSGQueue.remove(m);
! }
! m.setNotified();
! }
! }
!
! try {
! listener.receiveMSG(m);
! } catch (BEEPError e) {
! try {
! m.sendERR(e);
! } catch (BEEPException e2) {
! Log.logEntry(Log.SEV_ERROR, e2);
! }
! } catch (AbortChannelException e) {
! try {
! Channel.this.close();
! } catch (BEEPException e2) {
! Log.logEntry(Log.SEV_ERROR, e2);
! }
! }
! }
! });
! } catch (InterruptedException e) {
! throw new BEEPException(e);
}
}
--- 486,514 ----
if (notify) {
! synchronized (recvMSGQueue) {
! final MessageMSG m =
! (MessageMSG)recvMSGQueue.getFirst();
! synchronized (m) {
! if (m.getDataStream().isComplete()) {
! recvMSGQueue.remove(m);
! }
! m.setNotified();
! }
!
! try {
! listener.receiveMSG(m);
! } catch (BEEPError e) {
! try {
! m.sendERR(e);
! } catch (BEEPException e2) {
! Log.logEntry(Log.SEV_ERROR, e2);
! }
! } catch (AbortChannelException e) {
! try {
! Channel.this.close();
! } catch (BEEPException e2) {
! Log.logEntry(Log.SEV_ERROR, e2);
! }
! }
}
}
***************
*** 532,535 ****
--- 554,561 ----
}
+ if (frame.isLast() && getState() == STATE_TUNING) {
+ this.session.disableIO();
+ }
+
if (frame.getMessageType() == Message.MESSAGE_TYPE_NUL) {
synchronized (recvReplyQueue) {
***************
*** 680,684 ****
int msgno = frame.getMsgno();
! if (state != STATE_OK) {
throw new BEEPException("State is " + state);
}
--- 706,710 ----
int msgno = frame.getMsgno();
! if (state != STATE_ACTIVE && state != STATE_TUNING) {
throw new BEEPException("State is " + state);
}
***************
*** 801,807 ****
void sendMessage(MessageStatus m) throws BEEPException
{
! if (state != STATE_OK) {
switch (state) {
! case STATE_UNINITIALISED :
throw new BEEPException("Channel is uninitialised.");
default :
--- 827,833 ----
void sendMessage(MessageStatus m) throws BEEPException
{
! if (state != STATE_ACTIVE && state != STATE_TUNING) {
switch (state) {
! case STATE_INITIALIZED :
throw new BEEPException("Channel is uninitialised.");
default :
Index: Session.java
===================================================================
RCS file: /cvsroot/beepcore-java/beepcore-java/src/org/beepcore/beep/core/Session.java,v
retrieving revision 1.27
retrieving revision 1.28
diff -C2 -r1.27 -r1.28
*** Session.java 28 May 2002 04:50:06 -0000 1.27
--- Session.java 20 Aug 2002 03:08:58 -0000 1.28
***************
*** 177,181 ****
zero = new Channel(this, CHANNEL_ZERO, greetingListener);
! zero.setMessageListener(new ChannelZeroListener());
channels.put(CHANNEL_ZERO, zero);
--- 177,181 ----
zero = new Channel(this, CHANNEL_ZERO, greetingListener);
! zero.setMessageListener(new ChannelZeroListener(), false);
channels.put(CHANNEL_ZERO, zero);
***************
*** 228,232 ****
zero = new Channel(this, CHANNEL_ZERO, greetingListener);
! zero.setMessageListener(new ChannelZeroListener());
channels.put(CHANNEL_ZERO, zero);
--- 228,232 ----
zero = new Channel(this, CHANNEL_ZERO, greetingListener);
! zero.setMessageListener(new ChannelZeroListener(), false);
channels.put(CHANNEL_ZERO, zero);
***************
*** 525,529 ****
*/
Channel startChannelRequest(Collection profiles, MessageListener listener,
! boolean disableIO)
throws BEEPException, BEEPError
{
--- 525,529 ----
*/
Channel startChannelRequest(Collection profiles, MessageListener listener,
! boolean tuning)
throws BEEPException, BEEPError
{
***************
*** 567,571 ****
// @todo handle the data element
// Create a channel
! Channel ch = new Channel(null, channelNumber, listener, this);
// Make a message
--- 567,571 ----
// @todo handle the data element
// Create a channel
! Channel ch = new Channel(null, channelNumber, listener, true, this);
// Make a message
***************
*** 574,579 ****
StringUtil.stringBufferToAscii(startBuffer));
// Tell Channel Zero to start us up
! StartReplyListener reply = new StartReplyListener(ch, disableIO);
synchronized (reply) {
this.zero.sendMSG(ds, reply);
--- 574,585 ----
StringUtil.stringBufferToAscii(startBuffer));
+ if (tuning) {
+ this.changeState(SESSION_STATE_TUNING_PENDING);
+ this.changeState(SESSION_STATE_TUNING);
+ this.zero.setState(Channel.STATE_TUNING);
+ }
+
// Tell Channel Zero to start us up
! StartReplyListener reply = new StartReplyListener(ch);
synchronized (reply) {
this.zero.sendMSG(ds, reply);
***************
*** 592,600 ****
}
! if (ch.getState() != Channel.STATE_OK) {
throw new BEEPException("Error channel state (" +
ch.getState() + ")");
}
fireChannelStarted(ch);
return ch;
--- 598,610 ----
}
! if (ch.getState() != Channel.STATE_ACTIVE) {
throw new BEEPException("Error channel state (" +
ch.getState() + ")");
}
+ if (tuning) {
+ ch.setState(Channel.STATE_TUNING);
+ }
+
fireChannelStarted(ch);
return ch;
***************
*** 736,749 ****
/**
- * Method prevents Channel's window from being updated.
- *
- *
- */
- protected void prohibitChannelWindowUpdates()
- {
- allowChannelWindowUpdates = false;
- }
-
- /**
* This method is used by a tuning profile to reset the session after the
* tuning is complete.
--- 746,749 ----
***************
*** 964,968 ****
// Store the Channel
! ch.setState(Channel.STATE_OK);
channels.put(ch.getNumberAsString(), ch);
((Message)zero.getAppData()).sendRPY(ds);
--- 964,968 ----
// Store the Channel
! ch.setState(Channel.STATE_ACTIVE);
channels.put(ch.getNumberAsString(), ch);
((Message)zero.getAppData()).sendRPY(ds);
***************
*** 1221,1225 ****
}
! ch = new Channel(p.uri, channelNumber, null, this);
try {
--- 1221,1225 ----
}
! ch = new Channel(p.uri, channelNumber, this);
try {
***************
*** 1532,1542 ****
Channel channel;
- boolean disableIO;
BEEPError error;
! StartReplyListener(Channel channel, boolean disableIO)
{
this.channel = channel;
- this.disableIO = disableIO;
this.error = null;
}
--- 1532,1540 ----
Channel channel;
BEEPError error;
! StartReplyListener(Channel channel)
{
this.channel = channel;
this.error = null;
}
***************
*** 1553,1560 ****
{
try {
- if (disableIO) {
- Session.this.disableIO();
- }
-
Element topElement = processMessage(message);
--- 1551,1554 ----
***************
*** 1600,1604 ****
// set the state
! channel.setState(Channel.STATE_OK);
channels.put(channel.getNumberAsString(), channel);
--- 1594,1598 ----
// set the state
! channel.setState(Channel.STATE_ACTIVE);
channels.put(channel.getNumberAsString(), channel);
***************
*** 1612,1622 ****
this.notify();
}
-
- // I'm not sure why this is being done.
- if (TuningProfile.isTuningProfile(uri)) {
- Log.logEntry(Log.SEV_DEBUG, CORE,
- "Disabling this I/O thread");
- Session.this.disableIO();
- }
} catch (Exception x) {
throw new BEEPException(x.getMessage());
--- 1606,1609 ----
***************
*** 1742,1746 ****
// set the state
! channel.setState(Channel.STATE_OK);
channels.remove(channel.getNumberAsString());
--- 1729,1733 ----
// set the state
! channel.setState(Channel.STATE_ACTIVE);
channels.remove(channel.getNumberAsString());
|