2009-11-07 21:04:01 UTC
Here is the code:
package com.serotonin.io.messaging;
import java.io.IOException;
import com.serotonin.io.StreamUtils;
/**
* Connection class for applications that sends messages (client or master). Implements the send method for sending
* messages to some listener. See ListenerConnection for server or slave applications.
* @author mlohbihler
*/
public class SenderConnection extends MessagingConnection {
private static int DEFAULT_RETRIES = 2;
private static int DEFAULT_TIMEOUT = 500;
private final Boolean LOCK = new Boolean(false);
private byte[] tempData;
private MessageResponse response = null;
private ResponseParseException exception = null;
private int retries = DEFAULT_RETRIES;
private int timeout = DEFAULT_TIMEOUT;
public int getRetries() {
return retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
/**
* Attempt to send the given request. If the request fails, retry the request
* the given number of times. If the request still fails after all retries are exhauted,
* return null.
* @param request the request to send
* @param retries the number of retry attempts in case of failure
* @return the response or null if a response was never received
* @throws IOException if there is a stream communication exception
* @throws MessageMismatchException
*/
public MessageResponse send(MessageRequest request, int retries, int timeout) throws MessageSendException {
synchronized (LOCK) {
response = null;
exception = null;
dataBuffer.clear();
do {
if (DEBUG)
System.out.println("MessagingConnection.write: "+ StreamUtils.dumpMessage(request.getMessageData()));
try {
transport.write(request.getMessageData());
}
catch (IOException e) {
throw new MessageSendException(e);
}
synchronized (this) {
try {
wait(timeout);
}
catch (InterruptedException e) { /* no op */ }
}
// Check if there is an exception
if (exception != null)
throw exception;
// Make sure the response matches the request.
if (response != null)
request.isValidResponse(response);
}
while (response == null && retries-- > 0);
if (response == null)
throw new SendTimeoutException();
return response;
}
}
public MessageResponse send(MessageRequest request) throws MessageSendException {
return send(request, retries, timeout);
}
@Override
public void data() {
// Make a copy of the data in case the entire message is not in yet.
tempData = dataBuffer.peekAll();
// Attempt to parse the message.
try {
response = messageParser.parseResponse(dataBuffer);
if (response == null) {
// We assume that the message is incomplete, and go back to wait for
// more data to show.
dataBuffer.clear();
dataBuffer.push(tempData);
}
else {
// Notify the current thread.
synchronized (this) {
notify();
}
}
}
catch (Exception e) {
exception = new ResponseParseException(e);
}
}
}