Thread: [jgroups-users] v3->v4: my ReceiverAdapter methods are not being called
Brought to you by:
belaban
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-03 19:30:08
|
Thank you for previous help with castMessage/handle and serializable->streamable. That all went fine, and I can send messages that way properly now across the cluster. Now I'm seeing a bigger problem, but it's so easy to describe that am hopeful it's a simple RTFM thing (though I've read it several times now). None of my ReceiverAdapter methods are being called. Am trying 4.0.8 now but also tried 4.0.7. I can tell from logs/debugger that nodes are finding each other and joining, are sending messages from one to the other [1], and a getState call is made [2]. I override receive, get/setState, viewAccepted, and suspect, and none of them are called. Note that this same code worked with v3. My base class is declared with "extends ReceiverAdapter implements RequestHandler" and I set up like this (unchanged from v3): jChannel = env.createJChannel(); // tried with name(<string>) also jChannel.setReceiver(this); dispatcher = new MessageDispatcher(jChannel, this); jChannel.connect(<name>, null, 10 * 1000); With channel creation like this: Protocol[] stack = { new TCP() .setValue("bind_addr", InetAddress.getByName(<bind address>)) .setValue("bind_port", <port>) .setValue("thread_pool_min_threads", 1) .setValue("thread_pool_keep_alive_time", 5000) .setValue("send_buf_size", 640000) .setValue("sock_conn_timeout", 300) .setValue("recv_buf_size", 5000000), new TCPPING() .setValue("initial_hosts", <list>) .setValue("send_cache_on_join", true) .setValue("port_range", 0), new MERGE3() .setValue("min_interval", 10000) .setValue("max_interval", 30000), new FD_ALL() .setValue("timeout", <timeout>), new VERIFY_SUSPECT() .setValue("timeout", 1500), new BARRIER(), new NAKACK2() .setValue("use_mcast_xmit", false), new UNICAST3(), new STABLE() .setValue("desired_avg_gossip", 50000) .setValue("max_bytes", 4000000), // createAuthProtocol(), <-- commented for now new GMS() .setValue("join_timeout", 3000), new MFC() .setValue("max_credits", 2000000) .setValue("min_credits", 800000), new FRAG2(), new STATE_TRANSFER()}; return new JChannel(stack); Since *none* of my ReceiverAdapter code is being called, am hoping it's something dumb. I stepped into the receive case to try to figure it out and found here is where the message is being swallowed: RequestCorrelator, called by MessageDispatcher. hdr = null: public void receiveMessageBatch(MessageBatch batch) { for(Message msg : batch) { Header hdr=msg.getHeader(this.corr_id); if(hdr == null || hdr.corrId != this.corr_id) // msg was sent by a different request corr in the same stack continue; if(hdr instanceof MultiDestinationHeader) { [...] } dispatch(msg, hdr); } } There is no header that matches this corr_id (200). The headers are NakAckHeader2 (41) and TpHeader (57). Thanks, Bobby [1] 11/3/17 12:03:56 PM org.jgroups.protocols.TP passBatchUp FINER: localhost-63787: received message batch of 1 messages from localhost-63787 11/3/17 12:03:56 PM org.jgroups.protocols.pbcast.NAKACK2 handleMessages FINER: localhost-63787: received localhost-63787#1-1 (1 messages) [2] 11/3/17 11:51:00 AM org.jgroups.protocols.pbcast.STATE_TRANSFER handleStateReq FINE: localhost-50939: received state request from localhost-28424 |
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-06 13:38:36
|
Your methods are never called because the MessageDispatcher catches all of these messages! So JChannel.setReceiver(this) is setting a receiver that won't ever get called. If you have a MessageDispatcher, receive(Message) and receive(MessageBatch) are never called; instead either handle(Message) or handle(Message,Response) are going to be called. Event such as view changes are caught by registering a MembershipListener directly with the MessageDispatcher (setMembershipListener()), state changes are registered for by calling MessageDispatcher.setStateListener(). Take a look at the simple program below that shows how to do this. Cheers, public class bla extends ReceiverAdapter implements RequestHandler { protected JChannel ch; protected static final String props="/home/bela/fast.xml"; protected MessageDispatcher dispatcher; protected int cnt=1; protected void start(String name) throws Exception { ch=new JChannel(props).name(name); // ch.setReceiver(this); // useless receiver won't get called! dispatcher = new MessageDispatcher(ch, this); dispatcher.setMembershipListener(this); // dispatcher.setStateListener(this); ch.connect("demo", null, 10 * 1000); int num=1; for(;;) { Util.keyPress(">>"); String tmp="msg-" + num++; RspList<Object> rsps=dispatcher.castMessage(null, new ByteArrayPayload(tmp.getBytes()), RequestOptions.SYNC()); System.out.printf("rsps: %s\n", rsps); } } public static void main(String[] args) throws Exception { new bla().start(args[0]); } public void receive(Message msg) { System.out.printf("receive() called\n"); } public void receive(MessageBatch batch) { System.out.printf("receiveBatch() called\n"); } public void viewAccepted(View view) { System.out.printf("-- view: %s\n", view); } public Object handle(Message msg) throws Exception { System.out.printf("handle() called\n"); return cnt++; } public void handle(Message request, Response response) throws Exception { Object retval=handle(request); response.send(retval, false); } } On 03/11/17 20:29, Questions/problems related to using JGroups wrote: > Thank you for previous help with castMessage/handle and > serializable->streamable. That all went fine, and I can send messages > that way properly now across the cluster. Now I'm seeing a bigger > problem, but it's so easy to describe that am hopeful it's a simple RTFM > thing (though I've read it several times now). > > None of my ReceiverAdapter methods are being called. Am trying 4.0.8 now > but also tried 4.0.7. I can tell from logs/debugger that nodes are > finding each other and joining, are sending messages from one to the > other [1], and a getState call is made [2]. I override receive, > get/setState, viewAccepted, and suspect, and none of them are called. > Note that this same code worked with v3. > > My base class is declared with "extends ReceiverAdapter implements > RequestHandler" and I set up like this (unchanged from v3): > > jChannel = env.createJChannel(); // tried with name(<string>) also > jChannel.setReceiver(this); > dispatcher = new MessageDispatcher(jChannel, this); > jChannel.connect(<name>, null, 10 * 1000); > > With channel creation like this: > > Protocol[] stack = { > new TCP() > .setValue("bind_addr", InetAddress.getByName(<bind > address>)) > .setValue("bind_port", <port>) > .setValue("thread_pool_min_threads", 1) > .setValue("thread_pool_keep_alive_time", 5000) > .setValue("send_buf_size", 640000) > .setValue("sock_conn_timeout", 300) > .setValue("recv_buf_size", 5000000), > new TCPPING() > .setValue("initial_hosts", <list>) > .setValue("send_cache_on_join", true) > .setValue("port_range", 0), > new MERGE3() > .setValue("min_interval", 10000) > .setValue("max_interval", 30000), > new FD_ALL() > .setValue("timeout", <timeout>), > new VERIFY_SUSPECT() > .setValue("timeout", 1500), > new BARRIER(), > new NAKACK2() > .setValue("use_mcast_xmit", false), > new UNICAST3(), > new STABLE() > .setValue("desired_avg_gossip", 50000) > .setValue("max_bytes", 4000000), > // createAuthProtocol(), <-- commented for now > new GMS() > .setValue("join_timeout", 3000), > new MFC() > .setValue("max_credits", 2000000) > .setValue("min_credits", 800000), > new FRAG2(), > new STATE_TRANSFER()}; > return new JChannel(stack); > > Since *none* of my ReceiverAdapter code is being called, am hoping it's > something dumb. I stepped into the receive case to try to figure it out > and found here is where the message is being swallowed: > > RequestCorrelator, called by MessageDispatcher. hdr = null: > > public void receiveMessageBatch(MessageBatch batch) { > for(Message msg : batch) { > Header hdr=msg.getHeader(this.corr_id); > if(hdr == null || hdr.corrId != this.corr_id) // msg was > sent by a different request corr in the same stack > continue; > > if(hdr instanceof MultiDestinationHeader) { > [...] > } > dispatch(msg, hdr); > } > } > > There is no header that matches this corr_id (200). The headers are > NakAckHeader2 (41) and > TpHeader (57). > > Thanks, > Bobby > > > [1] 11/3/17 12:03:56 PM org.jgroups.protocols.TP > <http://org.jgroups.protocols.TP> passBatchUp FINER: localhost-63787: > received message batch of 1 messages from localhost-63787 > 11/3/17 12:03:56 PM org.jgroups.protocols.pbcast.NAKACK2 handleMessages > FINER: localhost-63787: received localhost-63787#1-1 (1 messages) > > [2] 11/3/17 11:51:00 AM org.jgroups.protocols.pbcast.STATE_TRANSFER > handleStateReq FINE: localhost-50939: received state request from > localhost-28424 > > > > ------------------------------------------------------------------------------ > Check out the vibrant tech community on one of the world's most > engaging tech sites, Slashdot.org! http://sdm.link/slashdot > > > > _______________________________________________ > javagroups-users mailing list > jav...@li... > https://lists.sourceforge.net/lists/listinfo/javagroups-users > -- Bela Ban | http://www.jgroups.org |
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-06 20:40:32
|
On Mon, Nov 6, 2017 at 8:37 AM, Questions/problems related to using JGroups <jav...@li...> wrote: > Your methods are never called because the MessageDispatcher catches all of > these messages! Thank you -- I definitely missed that in the docs and didn't expect it. I have my view changes and state back! So I've pulled my 'jChannel.setReceiver(this);' and have: jChannel = env.createJChannel(this); dispatcher = new MessageDispatcher(jChannel, this); dispatcher.setMembershipListener(this); dispatcher.setStateListener(this); In v3, I was used to MD#castMessage going to handle() and JChannel#send going to receive(). To be really clear, are you saying that, since we have a MessageDispatcher, I should no longer use channel#send and should convert those calls to use the MessageDispatcher? And I don't need this I guess, right? (If so, what is this method for?) dispatcher.setRequestHandler(this); Thanks, Bobby |
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-07 09:38:32
|
On 06/11/17 21:40, Questions/problems related to using JGroups wrote: > On Mon, Nov 6, 2017 at 8:37 AM, Questions/problems related to using > JGroups <jav...@li... > <mailto:jav...@li...>> wrote: > > Your methods are never called because the MessageDispatcher catches > all of these messages! > > > Thank you -- I definitely missed that in the docs and didn't expect it. > I have my view changes and state back! > > So I've pulled my 'jChannel.setReceiver(this);' and have: > > jChannel = env.createJChannel(this); > dispatcher = new MessageDispatcher(jChannel, this); > dispatcher.setMembershipListener(this); > dispatcher.setStateListener(this); Correct: the MessageDispatcher consumes all messages *before* they reach a registered Receiver. If you wanted both on the same channel, you'd have to use FORK and ForkChannels (see the manual for details). > In v3, I was used to MD#castMessage going to handle() and JChannel#send > going to receive(). To be really clear, are you saying that, since we > have a MessageDispatcher, I should no longer use channel#send and should > convert those calls to use the MessageDispatcher? Right. The MessageDispatcher trumps the Receiver, but fork channels (see above) would allow both to be on the same channel. > And I don't need this I guess, right? (If so, what is this method for?) > > dispatcher.setRequestHandler(this); This method is already called in the constructor of the MessageDispatcher (the 2nd arg). > Thanks, > Bobby > > > > ------------------------------------------------------------------------------ > Check out the vibrant tech community on one of the world's most > engaging tech sites, Slashdot.org! http://sdm.link/slashdot > > > > _______________________________________________ > javagroups-users mailing list > jav...@li... > https://lists.sourceforge.net/lists/listinfo/javagroups-users > -- Bela Ban | http://www.jgroups.org |