User: vlada
Date: 07/10/02 06:20:23
Modified: src/org/jgroups/mux Multiplexer.java
Log:
[JGRP-598] - Simplify FLUSH
[JGRP-600] - Investigate Multiplexer locking scopes
Revision Changes Path
1.73 +63 -83 JGroups/src/org/jgroups/mux/Multiplexer.java
Index: Multiplexer.java
===================================================================
RCS file: /cvsroot/javagroups/JGroups/src/org/jgroups/mux/Multiplexer.java,v
retrieving revision 1.72
retrieving revision 1.73
diff -u -r1.72 -r1.73
--- Multiplexer.java 1 Oct 2007 11:49:07 -0000 1.72
+++ Multiplexer.java 2 Oct 2007 13:20:22 -0000 1.73
@@ -11,7 +11,6 @@
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Used for dispatching incoming messages. The Multiplexer implements UpHandler and registers with the associated
@@ -19,7 +18,7 @@
* message is removed and the MuxChannel corresponding to the header's service ID is retrieved from the map,
* and MuxChannel.up() is called with the message.
* @author Bela Ban
- * @version $Id: Multiplexer.java,v 1.72 2007/10/01 11:49:07 vlada Exp $
+ * @version $Id: Multiplexer.java,v 1.73 2007/10/02 13:20:22 vlada Exp $
*/
public class Multiplexer implements UpHandler {
/** Map<String,MuxChannel>. Maintains the mapping between service IDs and their associated MuxChannels */
@@ -29,23 +28,20 @@
static final String SEPARATOR="::";
static final short SEPARATOR_LEN=(short)SEPARATOR.length();
static final String NAME="MUX";
-
- private boolean flush_present=true;
+
private boolean blocked=false;
/** Thread pool to concurrently process messages sent to different services */
- private ExecutorService thread_pool;
+ private final ExecutorService thread_pool;
/** To make sure messages sent to different services are processed concurrently (using the thread pool above), but
* messages to the same service are processed FIFO */
- private FIFOMessageQueue<String,Runnable> fifo_queue=new FIFOMessageQueue<String,Runnable>();
+ private final FIFOMessageQueue<String,Runnable> fifo_queue=new FIFOMessageQueue<String,Runnable>();
/** Cluster view */
private volatile View view=null;
- private volatile Address local_addr=null;
-
/** Map<String,Boolean>. Map of service IDs and booleans that determine whether getState() has already been called */
private final Map<String,Boolean> state_transfer_listeners=new HashMap<String,Boolean>();
@@ -56,28 +52,21 @@
* Used to collect responses to LIST_SERVICES_REQ */
private final Map<Address, Set<String>> service_responses=new HashMap<Address, Set<String>>();
- private long SERVICES_RSP_TIMEOUT=10000;
-
- public Multiplexer() {
- this.channel=null;
- flush_present=isFlushPresent();
-
- // threadpool is enabled by default
- if(Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)){
- thread_pool=createThreadPool();
- }
- }
+ private long SERVICES_RSP_TIMEOUT=10000;
public Multiplexer(JChannel channel) {
this.channel=channel;
this.channel.setUpHandler(this);
- this.channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
- flush_present=isFlushPresent();
+ this.channel.setOpt(Channel.BLOCK, Boolean.TRUE); // we want to handle BLOCK events ourselves
- //threadpool is enabled by default
- if(Global.getPropertyAsBoolean(Global.MUX_ENABLED, true)){
+ //thread pool is enabled by default
+ boolean use_thread_pool = Global.getPropertyAsBoolean(Global.MUX_ENABLED, true);
+ if(use_thread_pool){
thread_pool=createThreadPool();
}
+ else{
+ thread_pool=null;
+ }
}
/**
@@ -225,16 +214,16 @@
}
public void sendServiceUpMessage(String service, Address host,boolean bypassFlush) throws Exception {
- sendServiceMessage(ServiceInfo.SERVICE_UP, service, host,bypassFlush, null);
- if(local_addr != null && host != null && local_addr.equals(host))
- handleServiceUp(service, host, false);
+ //we have to make this service message non OOB since we have
+ //to FIFO order service messages and BLOCK/UNBLOCK messages
+ sendServiceMessage(ServiceInfo.SERVICE_UP, service, host,bypassFlush, null,false);
}
public void sendServiceDownMessage(String service, Address host,boolean bypassFlush) throws Exception {
- sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host,bypassFlush, null);
- if(local_addr != null && host != null && local_addr.equals(host))
- handleServiceDown(service, host, false);
+ //we have to make this service message non OOB since we have
+ //to FIFO order service messages and BLOCK/UNBLOCK messages
+ sendServiceMessage(ServiceInfo.SERVICE_DOWN, service, host,bypassFlush, null,false);
}
@@ -303,8 +292,14 @@
}
}
- if(!left_members.isEmpty())
- adjustServiceViews(left_members);
+ for(Address member:left_members){
+ try{
+ adjustServiceView(member);
+ }catch(Throwable t){
+ if(log.isErrorEnabled())
+ log.error("failed adjusting service views", t);
+ }
+ }
break;
case Event.PREPARE_VIEW:
@@ -341,8 +336,7 @@
handleStateResponse(evt);
break;
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
+ case Event.SET_LOCAL_ADDRESS:
passToAllMuxChannels(evt);
break;
@@ -481,17 +475,11 @@
return all_closed;
}
-
- private boolean isFlushPresent() {
- return channel.getProtocolStack().findProtocol("FLUSH") != null;
- }
-
- private Address getLocalAddress() {
- if(local_addr != null)
- return local_addr;
+ private Address getLocalAddress() {
if(channel != null)
- local_addr=channel.getLocalAddress();
- return local_addr;
+ return channel.getLocalAddress();
+
+ return null;
}
/**
@@ -519,7 +507,7 @@
return result;
}
- private void sendServiceMessage(byte type, String service, Address host,boolean bypassFlush, byte[] payload) throws Exception {
+ private void sendServiceMessage(byte type, String service, Address host,boolean bypassFlush, byte[] payload, boolean oob) throws Exception {
if(host == null)
host=getLocalAddress();
if(host == null) {
@@ -531,10 +519,12 @@
ServiceInfo si=new ServiceInfo(type, service, host, payload);
MuxHeader hdr=new MuxHeader(si);
- Message service_msg=new Message();
- service_msg.setFlag(Message.OOB);
+ Message service_msg=new Message();
+
+ if(oob)
+ service_msg.setFlag(Message.OOB);
service_msg.putHeader(NAME, hdr);
- if(bypassFlush && flush_present)
+ if(bypassFlush && channel.flushSupported())
service_msg.putHeader(FLUSH.NAME, new FLUSH.FlushHeader(FLUSH.FlushHeader.FLUSH_BYPASS));
channel.send(service_msg);
@@ -560,7 +550,9 @@
MuxChannel mux_ch=services.get(id);
if(mux_ch == null)
- throw new IllegalArgumentException("didn't find service with ID=" + id + " to fetch state from");
+ throw new IllegalArgumentException("State provider "
+ + channel.getLocalAddress()
+ + " does not have service with id " + id);
// state_id will be null, get regular state from the service named state_id
StateTransferInfo ret=(StateTransferInfo)passToMuxChannel(mux_ch, evt, fifo_queue, requester, id, true);
@@ -602,8 +594,9 @@
}
mux_ch=services.get(appl_id);
- if(mux_ch == null) {
- log.error("didn't find service with ID=" + appl_id + " to fetch state from");
+ if (mux_ch == null){
+ log.error("State receiver " + channel.getLocalAddress()
+ + " does not have service with id " + appl_id);
}
else {
StateTransferInfo tmp_info=info.copy();
@@ -652,11 +645,6 @@
List<Address> hosts, hosts_copy;
boolean removed=false;
- // discard if we sent this message
- if(received && host != null && local_addr != null && local_addr.equals(host)) {
- return;
- }
-
synchronized(service_state) {
hosts=service_state.get(service);
if(hosts == null)
@@ -668,10 +656,13 @@
if(removed){
View service_view = generateServiceView(hosts_copy);
MuxChannel ch = services.get(service);
- if(ch != null){
- Event view_evt = new Event(Event.VIEW_CHANGE, service_view);
- if(ch.isConnected())
- passToMuxChannel(ch, view_evt, fifo_queue, null, service, false);
+ if(ch != null && ch.isConnected()){
+ Event view_evt = new Event(Event.VIEW_CHANGE, service_view);
+ //we cannot pass service message to thread pool since we have
+ //to FIFO order service messages with BLOCK/UNBLOCK messages
+ //therefore all of them have to bypass thread pool
+
+ passToMuxChannel(ch, view_evt, fifo_queue, null, service, false,true);
}else{
if(log.isTraceEnabled())
log.trace("service " + service
@@ -690,11 +681,6 @@
List<Address> hosts, hosts_copy;
boolean added=false;
- // discard if we sent this message
- if(received && host != null && local_addr != null && local_addr.equals(host)) {
- return;
- }
-
synchronized(service_state) {
hosts=service_state.get(service);
if(hosts == null) {
@@ -712,8 +698,11 @@
View service_view = generateServiceView(hosts_copy);
MuxChannel ch = services.get(service);
if(ch != null){
- Event view_evt = new Event(Event.VIEW_CHANGE, service_view);
- passToMuxChannel(ch, view_evt, fifo_queue, null, service, false);
+ Event view_evt = new Event(Event.VIEW_CHANGE, service_view);
+ //we cannot pass service message to thread pool since we have
+ //to FIFO order service messages with BLOCK/UNBLOCK messages
+ //therefore all of them have to bypass thread pool
+ passToMuxChannel(ch, view_evt, fifo_queue, null, service, false,true);
}else{
if(log.isTraceEnabled())
log.trace("service " + service
@@ -737,7 +726,10 @@
Map<Address, Set<String>> copy=null;
byte[] data=Util.objectToByteBuffer(new HashSet<String>(services.keySet()));
- sendServiceMessage(ServiceInfo.LIST_SERVICES_RSP, null, channel.getLocalAddress(), true, data);
+
+ //we have to make this message OOB since we are running on a thread
+ //propelling a regular synchronous message call to install a new view
+ sendServiceMessage(ServiceInfo.LIST_SERVICES_RSP, null, channel.getLocalAddress(), true, data,true);
synchronized(service_responses) {
start=System.currentTimeMillis();
@@ -811,17 +803,6 @@
}
}
- private void adjustServiceViews(Vector<Address> left_members) {
- for(Address member:left_members){
- try{
- adjustServiceView(member);
- }catch(Throwable t){
- if(log.isErrorEnabled())
- log.error("failed adjusting service views", t);
- }
- }
- }
-
private void adjustServiceView(Address host) {
synchronized(service_state){
@@ -837,10 +818,9 @@
// generateServiceView()
View service_view = generateServiceView(new ArrayList<Address>(hosts));
MuxChannel ch = services.get(service);
- if(ch != null){
+ if(ch != null && ch.isConnected()){
Event view_evt = new Event(Event.VIEW_CHANGE, service_view);
- if(ch.isConnected())
- passToMuxChannel(ch, view_evt, fifo_queue, null, service, false);
+ passToMuxChannel(ch, view_evt, fifo_queue, null, service, false, true);
}else{
if(log.isTraceEnabled())
log.trace("service " + service
@@ -870,8 +850,8 @@
private View generateServiceView(List<Address> hosts) {
if(view == null) {
Vector<Address> tmp=new Vector<Address>();
- tmp.add(local_addr);
- view= new View(new ViewId(local_addr), tmp);
+ tmp.add(getLocalAddress());
+ view= new View(new ViewId(getLocalAddress()), tmp);
}
Vector<Address> members=new Vector<Address>(view.getMembers());
members.retainAll(hosts);
|