Thread: [jgroups-users] v3->v4: my ReceiverAdapter methods are not being called
Brought to you by:
belaban
|
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-03 19:30:08
|
Thank you for previous help with castMessage/handle and
serializable->streamable. That all went fine, and I can send messages that
way properly now across the cluster. Now I'm seeing a bigger problem, but
it's so easy to describe that am hopeful it's a simple RTFM thing (though
I've read it several times now).
None of my ReceiverAdapter methods are being called. Am trying 4.0.8 now
but also tried 4.0.7. I can tell from logs/debugger that nodes are finding
each other and joining, are sending messages from one to the other [1], and
a getState call is made [2]. I override receive, get/setState,
viewAccepted, and suspect, and none of them are called. Note that this same
code worked with v3.
My base class is declared with "extends ReceiverAdapter implements
RequestHandler" and I set up like this (unchanged from v3):
jChannel = env.createJChannel(); // tried with name(<string>) also
jChannel.setReceiver(this);
dispatcher = new MessageDispatcher(jChannel, this);
jChannel.connect(<name>, null, 10 * 1000);
With channel creation like this:
Protocol[] stack = {
new TCP()
.setValue("bind_addr", InetAddress.getByName(<bind
address>))
.setValue("bind_port", <port>)
.setValue("thread_pool_min_threads", 1)
.setValue("thread_pool_keep_alive_time", 5000)
.setValue("send_buf_size", 640000)
.setValue("sock_conn_timeout", 300)
.setValue("recv_buf_size", 5000000),
new TCPPING()
.setValue("initial_hosts", <list>)
.setValue("send_cache_on_join", true)
.setValue("port_range", 0),
new MERGE3()
.setValue("min_interval", 10000)
.setValue("max_interval", 30000),
new FD_ALL()
.setValue("timeout", <timeout>),
new VERIFY_SUSPECT()
.setValue("timeout", 1500),
new BARRIER(),
new NAKACK2()
.setValue("use_mcast_xmit", false),
new UNICAST3(),
new STABLE()
.setValue("desired_avg_gossip", 50000)
.setValue("max_bytes", 4000000),
// createAuthProtocol(), <-- commented for now
new GMS()
.setValue("join_timeout", 3000),
new MFC()
.setValue("max_credits", 2000000)
.setValue("min_credits", 800000),
new FRAG2(),
new STATE_TRANSFER()};
return new JChannel(stack);
Since *none* of my ReceiverAdapter code is being called, am hoping it's
something dumb. I stepped into the receive case to try to figure it out and
found here is where the message is being swallowed:
RequestCorrelator, called by MessageDispatcher. hdr = null:
public void receiveMessageBatch(MessageBatch batch) {
for(Message msg : batch) {
Header hdr=msg.getHeader(this.corr_id);
if(hdr == null || hdr.corrId != this.corr_id) // msg was sent
by a different request corr in the same stack
continue;
if(hdr instanceof MultiDestinationHeader) {
[...]
}
dispatch(msg, hdr);
}
}
There is no header that matches this corr_id (200). The headers are
NakAckHeader2 (41) and
TpHeader (57).
Thanks,
Bobby
[1] 11/3/17 12:03:56 PM org.jgroups.protocols.TP passBatchUp FINER:
localhost-63787: received message batch of 1 messages from localhost-63787
11/3/17 12:03:56 PM org.jgroups.protocols.pbcast.NAKACK2 handleMessages
FINER: localhost-63787: received localhost-63787#1-1 (1 messages)
[2] 11/3/17 11:51:00 AM org.jgroups.protocols.pbcast.STATE_TRANSFER
handleStateReq FINE: localhost-50939: received state request from
localhost-28424
|
|
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-06 13:38:36
|
Your methods are never called because the MessageDispatcher catches all
of these messages! So JChannel.setReceiver(this) is setting a receiver
that won't ever get called.
If you have a MessageDispatcher, receive(Message) and
receive(MessageBatch) are never called; instead either handle(Message)
or handle(Message,Response) are going to be called.
Event such as view changes are caught by registering a
MembershipListener directly with the MessageDispatcher
(setMembershipListener()), state changes are registered for by calling
MessageDispatcher.setStateListener().
Take a look at the simple program below that shows how to do this.
Cheers,
public class bla extends ReceiverAdapter implements RequestHandler {
protected JChannel ch;
protected static final String props="/home/bela/fast.xml";
protected MessageDispatcher dispatcher;
protected int cnt=1;
protected void start(String name) throws Exception {
ch=new JChannel(props).name(name);
// ch.setReceiver(this); // useless receiver won't get called!
dispatcher = new MessageDispatcher(ch, this);
dispatcher.setMembershipListener(this);
// dispatcher.setStateListener(this);
ch.connect("demo", null, 10 * 1000);
int num=1;
for(;;) {
Util.keyPress(">>");
String tmp="msg-" + num++;
RspList<Object> rsps=dispatcher.castMessage(null, new
ByteArrayPayload(tmp.getBytes()), RequestOptions.SYNC());
System.out.printf("rsps: %s\n", rsps);
}
}
public static void main(String[] args) throws Exception {
new bla().start(args[0]);
}
public void receive(Message msg) {
System.out.printf("receive() called\n");
}
public void receive(MessageBatch batch) {
System.out.printf("receiveBatch() called\n");
}
public void viewAccepted(View view) {
System.out.printf("-- view: %s\n", view);
}
public Object handle(Message msg) throws Exception {
System.out.printf("handle() called\n");
return cnt++;
}
public void handle(Message request, Response response) throws
Exception {
Object retval=handle(request);
response.send(retval, false);
}
}
On 03/11/17 20:29, Questions/problems related to using JGroups wrote:
> Thank you for previous help with castMessage/handle and
> serializable->streamable. That all went fine, and I can send messages
> that way properly now across the cluster. Now I'm seeing a bigger
> problem, but it's so easy to describe that am hopeful it's a simple RTFM
> thing (though I've read it several times now).
>
> None of my ReceiverAdapter methods are being called. Am trying 4.0.8 now
> but also tried 4.0.7. I can tell from logs/debugger that nodes are
> finding each other and joining, are sending messages from one to the
> other [1], and a getState call is made [2]. I override receive,
> get/setState, viewAccepted, and suspect, and none of them are called.
> Note that this same code worked with v3.
>
> My base class is declared with "extends ReceiverAdapter implements
> RequestHandler" and I set up like this (unchanged from v3):
>
> jChannel = env.createJChannel(); // tried with name(<string>) also
> jChannel.setReceiver(this);
> dispatcher = new MessageDispatcher(jChannel, this);
> jChannel.connect(<name>, null, 10 * 1000);
>
> With channel creation like this:
>
> Protocol[] stack = {
> new TCP()
> .setValue("bind_addr", InetAddress.getByName(<bind
> address>))
> .setValue("bind_port", <port>)
> .setValue("thread_pool_min_threads", 1)
> .setValue("thread_pool_keep_alive_time", 5000)
> .setValue("send_buf_size", 640000)
> .setValue("sock_conn_timeout", 300)
> .setValue("recv_buf_size", 5000000),
> new TCPPING()
> .setValue("initial_hosts", <list>)
> .setValue("send_cache_on_join", true)
> .setValue("port_range", 0),
> new MERGE3()
> .setValue("min_interval", 10000)
> .setValue("max_interval", 30000),
> new FD_ALL()
> .setValue("timeout", <timeout>),
> new VERIFY_SUSPECT()
> .setValue("timeout", 1500),
> new BARRIER(),
> new NAKACK2()
> .setValue("use_mcast_xmit", false),
> new UNICAST3(),
> new STABLE()
> .setValue("desired_avg_gossip", 50000)
> .setValue("max_bytes", 4000000),
> // createAuthProtocol(), <-- commented for now
> new GMS()
> .setValue("join_timeout", 3000),
> new MFC()
> .setValue("max_credits", 2000000)
> .setValue("min_credits", 800000),
> new FRAG2(),
> new STATE_TRANSFER()};
> return new JChannel(stack);
>
> Since *none* of my ReceiverAdapter code is being called, am hoping it's
> something dumb. I stepped into the receive case to try to figure it out
> and found here is where the message is being swallowed:
>
> RequestCorrelator, called by MessageDispatcher. hdr = null:
>
> public void receiveMessageBatch(MessageBatch batch) {
> for(Message msg : batch) {
> Header hdr=msg.getHeader(this.corr_id);
> if(hdr == null || hdr.corrId != this.corr_id) // msg was
> sent by a different request corr in the same stack
> continue;
>
> if(hdr instanceof MultiDestinationHeader) {
> [...]
> }
> dispatch(msg, hdr);
> }
> }
>
> There is no header that matches this corr_id (200). The headers are
> NakAckHeader2 (41) and
> TpHeader (57).
>
> Thanks,
> Bobby
>
>
> [1] 11/3/17 12:03:56 PM org.jgroups.protocols.TP
> <http://org.jgroups.protocols.TP> passBatchUp FINER: localhost-63787:
> received message batch of 1 messages from localhost-63787
> 11/3/17 12:03:56 PM org.jgroups.protocols.pbcast.NAKACK2 handleMessages
> FINER: localhost-63787: received localhost-63787#1-1 (1 messages)
>
> [2] 11/3/17 11:51:00 AM org.jgroups.protocols.pbcast.STATE_TRANSFER
> handleStateReq FINE: localhost-50939: received state request from
> localhost-28424
>
>
>
> ------------------------------------------------------------------------------
> Check out the vibrant tech community on one of the world's most
> engaging tech sites, Slashdot.org! http://sdm.link/slashdot
>
>
>
> _______________________________________________
> javagroups-users mailing list
> jav...@li...
> https://lists.sourceforge.net/lists/listinfo/javagroups-users
>
--
Bela Ban | http://www.jgroups.org
|
|
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-06 20:40:32
|
On Mon, Nov 6, 2017 at 8:37 AM, Questions/problems related to using JGroups
<jav...@li...> wrote:
> Your methods are never called because the MessageDispatcher catches all of
> these messages!
Thank you -- I definitely missed that in the docs and didn't expect it. I
have my view changes and state back!
So I've pulled my 'jChannel.setReceiver(this);' and have:
jChannel = env.createJChannel(this);
dispatcher = new MessageDispatcher(jChannel, this);
dispatcher.setMembershipListener(this);
dispatcher.setStateListener(this);
In v3, I was used to MD#castMessage going to handle() and JChannel#send
going to receive(). To be really clear, are you saying that, since we have
a MessageDispatcher, I should no longer use channel#send and should convert
those calls to use the MessageDispatcher?
And I don't need this I guess, right? (If so, what is this method for?)
dispatcher.setRequestHandler(this);
Thanks,
Bobby
|
|
From: Questions/problems r. to u. J. <jav...@li...> - 2017-11-07 09:38:32
|
On 06/11/17 21:40, Questions/problems related to using JGroups wrote: > On Mon, Nov 6, 2017 at 8:37 AM, Questions/problems related to using > JGroups <jav...@li... > <mailto:jav...@li...>> wrote: > > Your methods are never called because the MessageDispatcher catches > all of these messages! > > > Thank you -- I definitely missed that in the docs and didn't expect it. > I have my view changes and state back! > > So I've pulled my 'jChannel.setReceiver(this);' and have: > > jChannel = env.createJChannel(this); > dispatcher = new MessageDispatcher(jChannel, this); > dispatcher.setMembershipListener(this); > dispatcher.setStateListener(this); Correct: the MessageDispatcher consumes all messages *before* they reach a registered Receiver. If you wanted both on the same channel, you'd have to use FORK and ForkChannels (see the manual for details). > In v3, I was used to MD#castMessage going to handle() and JChannel#send > going to receive(). To be really clear, are you saying that, since we > have a MessageDispatcher, I should no longer use channel#send and should > convert those calls to use the MessageDispatcher? Right. The MessageDispatcher trumps the Receiver, but fork channels (see above) would allow both to be on the same channel. > And I don't need this I guess, right? (If so, what is this method for?) > > dispatcher.setRequestHandler(this); This method is already called in the constructor of the MessageDispatcher (the 2nd arg). > Thanks, > Bobby > > > > ------------------------------------------------------------------------------ > Check out the vibrant tech community on one of the world's most > engaging tech sites, Slashdot.org! http://sdm.link/slashdot > > > > _______________________________________________ > javagroups-users mailing list > jav...@li... > https://lists.sourceforge.net/lists/listinfo/javagroups-users > -- Bela Ban | http://www.jgroups.org |