Re: [jgroups-users] v3->v4: my ReceiverAdapter methods are not being called
Brought to you by:
belaban
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-06 13:38:36
|
Your methods are never called because the MessageDispatcher catches all of these messages! So JChannel.setReceiver(this) is setting a receiver that won't ever get called. If you have a MessageDispatcher, receive(Message) and receive(MessageBatch) are never called; instead either handle(Message) or handle(Message,Response) are going to be called. Event such as view changes are caught by registering a MembershipListener directly with the MessageDispatcher (setMembershipListener()), state changes are registered for by calling MessageDispatcher.setStateListener(). Take a look at the simple program below that shows how to do this. Cheers, public class bla extends ReceiverAdapter implements RequestHandler { protected JChannel ch; protected static final String props="/home/bela/fast.xml"; protected MessageDispatcher dispatcher; protected int cnt=1; protected void start(String name) throws Exception { ch=new JChannel(props).name(name); // ch.setReceiver(this); // useless receiver won't get called! dispatcher = new MessageDispatcher(ch, this); dispatcher.setMembershipListener(this); // dispatcher.setStateListener(this); ch.connect("demo", null, 10 * 1000); int num=1; for(;;) { Util.keyPress(">>"); String tmp="msg-" + num++; RspList<Object> rsps=dispatcher.castMessage(null, new ByteArrayPayload(tmp.getBytes()), RequestOptions.SYNC()); System.out.printf("rsps: %s\n", rsps); } } public static void main(String[] args) throws Exception { new bla().start(args[0]); } public void receive(Message msg) { System.out.printf("receive() called\n"); } public void receive(MessageBatch batch) { System.out.printf("receiveBatch() called\n"); } public void viewAccepted(View view) { System.out.printf("-- view: %s\n", view); } public Object handle(Message msg) throws Exception { System.out.printf("handle() called\n"); return cnt++; } public void handle(Message request, Response response) throws Exception { Object retval=handle(request); response.send(retval, false); } } On 03/11/17 20:29, Questions/problems related to using JGroups wrote: > Thank you for previous help with castMessage/handle and > serializable->streamable. That all went fine, and I can send messages > that way properly now across the cluster. Now I'm seeing a bigger > problem, but it's so easy to describe that am hopeful it's a simple RTFM > thing (though I've read it several times now). > > None of my ReceiverAdapter methods are being called. Am trying 4.0.8 now > but also tried 4.0.7. I can tell from logs/debugger that nodes are > finding each other and joining, are sending messages from one to the > other [1], and a getState call is made [2]. I override receive, > get/setState, viewAccepted, and suspect, and none of them are called. > Note that this same code worked with v3. > > My base class is declared with "extends ReceiverAdapter implements > RequestHandler" and I set up like this (unchanged from v3): > > jChannel = env.createJChannel(); // tried with name(<string>) also > jChannel.setReceiver(this); > dispatcher = new MessageDispatcher(jChannel, this); > jChannel.connect(<name>, null, 10 * 1000); > > With channel creation like this: > > Protocol[] stack = { > new TCP() > .setValue("bind_addr", InetAddress.getByName(<bind > address>)) > .setValue("bind_port", <port>) > .setValue("thread_pool_min_threads", 1) > .setValue("thread_pool_keep_alive_time", 5000) > .setValue("send_buf_size", 640000) > .setValue("sock_conn_timeout", 300) > .setValue("recv_buf_size", 5000000), > new TCPPING() > .setValue("initial_hosts", <list>) > .setValue("send_cache_on_join", true) > .setValue("port_range", 0), > new MERGE3() > .setValue("min_interval", 10000) > .setValue("max_interval", 30000), > new FD_ALL() > .setValue("timeout", <timeout>), > new VERIFY_SUSPECT() > .setValue("timeout", 1500), > new BARRIER(), > new NAKACK2() > .setValue("use_mcast_xmit", false), > new UNICAST3(), > new STABLE() > .setValue("desired_avg_gossip", 50000) > .setValue("max_bytes", 4000000), > // createAuthProtocol(), <-- commented for now > new GMS() > .setValue("join_timeout", 3000), > new MFC() > .setValue("max_credits", 2000000) > .setValue("min_credits", 800000), > new FRAG2(), > new STATE_TRANSFER()}; > return new JChannel(stack); > > Since *none* of my ReceiverAdapter code is being called, am hoping it's > something dumb. I stepped into the receive case to try to figure it out > and found here is where the message is being swallowed: > > RequestCorrelator, called by MessageDispatcher. hdr = null: > > public void receiveMessageBatch(MessageBatch batch) { > for(Message msg : batch) { > Header hdr=msg.getHeader(this.corr_id); > if(hdr == null || hdr.corrId != this.corr_id) // msg was > sent by a different request corr in the same stack > continue; > > if(hdr instanceof MultiDestinationHeader) { > [...] > } > dispatch(msg, hdr); > } > } > > There is no header that matches this corr_id (200). The headers are > NakAckHeader2 (41) and > TpHeader (57). > > Thanks, > Bobby > > > [1] 11/3/17 12:03:56 PM org.jgroups.protocols.TP > <http://org.jgroups.protocols.TP> passBatchUp FINER: localhost-63787: > received message batch of 1 messages from localhost-63787 > 11/3/17 12:03:56 PM org.jgroups.protocols.pbcast.NAKACK2 handleMessages > FINER: localhost-63787: received localhost-63787#1-1 (1 messages) > > [2] 11/3/17 11:51:00 AM org.jgroups.protocols.pbcast.STATE_TRANSFER > handleStateReq FINE: localhost-50939: received state request from > localhost-28424 > > > > ------------------------------------------------------------------------------ > Check out the vibrant tech community on one of the world's most > engaging tech sites, Slashdot.org! http://sdm.link/slashdot > > > > _______________________________________________ > javagroups-users mailing list > jav...@li... > https://lists.sourceforge.net/lists/listinfo/javagroups-users > -- Bela Ban | http://www.jgroups.org |