Hi Mike,
I have a standalone Java app that has to request historical information. One of it's functions initiates the calls and needs the results synchronously. Are there any otfeed built-in mechanisms for this synchronization? I can run the HistDataCsv sample below just fine, but how can I access the incoming ohlc data from my calling function? Do you have any examples of such a scenario available?
The solution I found is below.
Is it good? Are there better ways? Seems quite an overkill for simply requesting a data point.
The client synchronizes over an object mr:
MessageRelayer mr = new MessageRelayer();
SynchronizedOHLCDataDelegate ohlcDataDelegate = new SynchronizedOHLCDataDelegate(mr);
command.setDataDelegate(ohlcDataDelegate);
command.setCompletionDelegate(ohlcDataDelegate);
IRequest request = connection.prepareRequest(command);
request.submit();
request.waitForCompletion();
Object returned = mr.get();
The implementation I wrote for the (both) delegates:
public class SynchronizedOHLCDataDelegate implements IDataDelegate<OTOHLC>, ICompletionDelegate {
private static final Logger log = Logger.getLogger(SynchronizedOHLCDataDelegate.class);
MessageRelayer messageRelayer;
public SynchronizedOHLCDataDelegate(MessageRelayer messageRelayer) {
this.messageRelayer = messageRelayer;
}
public void onData(OTOHLC ohlc) {
log.debug("onData!");
messageRelayer.put(ohlc);
}
And finally the plumbing: A producer/consumer implementation:
public class MessageRelayer {
private Object message; // shared data
private boolean available = false;
// Method used by the consumer to access the shared data
public synchronized Object get() {
while (available == false) {
try {
wait(); // Consumer enters a wait state until notified by the Producer
} catch (InterruptedException e) { }
}
available = false;
notifyAll(); // Consumer notifies Producer that it can store new contents
return message;
}
// Method used by the consumer to access (store) the shared data
public synchronized void put(Object message) {
while (available == true) {
try {
wait(); // Producer who wants to store contents enters
// a wait state until notified by the Consumer
} catch (InterruptedException e) { }
}
this.message = message;
available = true;
notifyAll(); // Producer notifies Consumer to come out
// of the wait state and consume the contents
}
}
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
Andreas, the idea is correct. Somewhat more compact solution would be to collect all data to the list before returning from the synchronous method using anonymous local classes:
List<OHCL> requestHistData(String excahnge, String symbol, Date start, Date end) {
final List<OHLC> out = new ArrayList<OHLC>();
final AtomicReference<OTError> errorRef = new AtomicReference<OTError>();
HistDataCommand command = new HistDataCommand();
command.setSymbol(symbol);
... // other sets
if(errorRef.get() != null) {
throw new IOError(error.toString());
}
return out;
}
Note that above there are NO threading concerns (and atomic reference object is used as convenient reference holder, not for its atomicity!). This is because waitForCompletion() is thread-safe, and everything else happens while we are blocked...
If you want to be able to receive data **before** last item arrives (i.e. as you go), then you need synchronized producer-consumer appoach:
Iterable<OHLC> requestHistData(...) {
final Queue<Object> queue = new BlockingQueue<Object>(); // fixme: check it is synchronized!
final Object END_OF_STREAM = new Object();
HistDataCommand = new HistDataCommand();
... // set parameters
command.setCompletionDelegate(new ICompletionDelegate() {
public void onDataEnd(OTError error) {
if(error == null) {
queue.offer(END_OF_STREAM);
} else {
queue.offer(error);
}
}
});
command.setDataDelegate(new IDataDelegate<OHLC>() {
public void onData(OHLC data) {
queue.offer(data);
}
));
IRequest request = connection.prepareRequest(command);
request.submit();
/// do NOT wait!
return new Iterable<OHLC>() {
public void Iterator<OHLC> iterator() {
return new Iterator<OHLC>() {
private OHLC last = null;
public boolean hasNext() {
if(last != null) return true;
Object next = queue.poll();
if(next instanceof OHLC) {
last = (OHLC) next;
return true;
} else if(next instanceof OTError) {
throw new RuntimeException(next.toString());
} else {
// must be END_OF_STREAM
return false;
}
}
public OHLC next() {
if(hasNext()) {
OHLS out = last;
last = null;
return out;
} else {
return null;
}
}
This is something that is emerging for container environment (J2EE). If enough interest is seen, I may just add it to the main driver, together with the adapter that converts asynchronous interface to the synchronous one...
Cheers,
-Mike
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
Hi Mike,
I have a standalone Java app that has to request historical information. One of it's functions initiates the calls and needs the results synchronously. Are there any otfeed built-in mechanisms for this synchronization? I can run the HistDataCsv sample below just fine, but how can I access the incoming ohlc data from my calling function? Do you have any examples of such a scenario available?
Thanks for the great work.
HistDataCsv excerpt:
HistDataCommand command = new HistDataCommand();
command.setExchangeCode(args[0]);
command.setSymbolCode(args[1]);
command.setStartDate(start);
command.setEndDate(end);
command.setAggregationSpan(new AggregationSpan(units, interval));
command.setDataDelegate(new IDataDelegate<OTOHLC>() {
private boolean firstTime = true;
public void onData(OTOHLC ohlc) {
if(firstTime) {
firstTime = false;
System.out.println(formatter.header(OTOHLC.class));
}
System.out.println(formatter.format(ohlc));
//Mike: How do I get the ohlc data back in to the main thread?
}
});
command.setCompletionDelegate(new ICompletionDelegate() {
public void onDataEnd(OTError error) {
System.out.println("# Error: " + error);
}
});
try {
IRequest request = connection.prepareRequest(command);
request.submit();
request.waitForCompletion();
System.out.println("Request completed!!");
} finally {
connection.shutdown();
connection.waitForCompletion();
}
The solution I found is below.
Is it good? Are there better ways? Seems quite an overkill for simply requesting a data point.
The client synchronizes over an object mr:
MessageRelayer mr = new MessageRelayer();
SynchronizedOHLCDataDelegate ohlcDataDelegate = new SynchronizedOHLCDataDelegate(mr);
command.setDataDelegate(ohlcDataDelegate);
command.setCompletionDelegate(ohlcDataDelegate);
IRequest request = connection.prepareRequest(command);
request.submit();
request.waitForCompletion();
Object returned = mr.get();
The implementation I wrote for the (both) delegates:
public class SynchronizedOHLCDataDelegate implements IDataDelegate<OTOHLC>, ICompletionDelegate {
private static final Logger log = Logger.getLogger(SynchronizedOHLCDataDelegate.class);
MessageRelayer messageRelayer;
public SynchronizedOHLCDataDelegate(MessageRelayer messageRelayer) {
this.messageRelayer = messageRelayer;
}
public void onData(OTOHLC ohlc) {
log.debug("onData!");
messageRelayer.put(ohlc);
}
public void onDataEnd(OTError error) {
log.error("Error:" + error);
if ((error != null) && (error.getCode() == 1003)) {
log.warn("No data");
messageRelayer.put("No data!");
} else {
log.error("Warning: unknown error!");
}
}
}
And finally the plumbing: A producer/consumer implementation:
public class MessageRelayer {
private Object message; // shared data
private boolean available = false;
// Method used by the consumer to access the shared data
public synchronized Object get() {
while (available == false) {
try {
wait(); // Consumer enters a wait state until notified by the Producer
} catch (InterruptedException e) { }
}
available = false;
notifyAll(); // Consumer notifies Producer that it can store new contents
return message;
}
// Method used by the consumer to access (store) the shared data
public synchronized void put(Object message) {
while (available == true) {
try {
wait(); // Producer who wants to store contents enters
// a wait state until notified by the Consumer
} catch (InterruptedException e) { }
}
this.message = message;
available = true;
notifyAll(); // Producer notifies Consumer to come out
// of the wait state and consume the contents
}
}
Andreas, the idea is correct. Somewhat more compact solution would be to collect all data to the list before returning from the synchronous method using anonymous local classes:
List<OHCL> requestHistData(String excahnge, String symbol, Date start, Date end) {
final List<OHLC> out = new ArrayList<OHLC>();
final AtomicReference<OTError> errorRef = new AtomicReference<OTError>();
HistDataCommand command = new HistDataCommand();
command.setSymbol(symbol);
... // other sets
command.setCompletionDelegate(new ICompletionDelegate() {
public void onDataEnd(OTError error) {
errorRef.set(error);
}
});
command.setDataDelegate(new IDataDelegate<OHLC>() {
public void onData(OHLC data) {
out.add(data);
}
));
IRequest request = connection.prepareRequest(command);
request.submit();
request.waitForCompletion();
if(errorRef.get() != null) {
throw new IOError(error.toString());
}
return out;
}
Note that above there are NO threading concerns (and atomic reference object is used as convenient reference holder, not for its atomicity!). This is because waitForCompletion() is thread-safe, and everything else happens while we are blocked...
If you want to be able to receive data **before** last item arrives (i.e. as you go), then you need synchronized producer-consumer appoach:
Iterable<OHLC> requestHistData(...) {
final Queue<Object> queue = new BlockingQueue<Object>(); // fixme: check it is synchronized!
final Object END_OF_STREAM = new Object();
HistDataCommand = new HistDataCommand();
... // set parameters
command.setCompletionDelegate(new ICompletionDelegate() {
public void onDataEnd(OTError error) {
if(error == null) {
queue.offer(END_OF_STREAM);
} else {
queue.offer(error);
}
}
});
command.setDataDelegate(new IDataDelegate<OHLC>() {
public void onData(OHLC data) {
queue.offer(data);
}
));
IRequest request = connection.prepareRequest(command);
request.submit();
/// do NOT wait!
return new Iterable<OHLC>() {
public void Iterator<OHLC> iterator() {
return new Iterator<OHLC>() {
private OHLC last = null;
public boolean hasNext() {
if(last != null) return true;
Object next = queue.poll();
if(next instanceof OHLC) {
last = (OHLC) next;
return true;
} else if(next instanceof OTError) {
throw new RuntimeException(next.toString());
} else {
// must be END_OF_STREAM
return false;
}
}
public OHLC next() {
if(hasNext()) {
OHLS out = last;
last = null;
return out;
} else {
return null;
}
}
}
}
};
}
Let me know if this works for you. Also, have a look at the synchronous interface to the opentick (in source mangement):
http://otfeed.svn.sourceforge.net/viewvc/otfeed/trunk/otfeed-all/otfeed-j2ee/src/main/java/org/otfeed/j2ee/IOpenTickConnection.java?view=markup
This is something that is emerging for container environment (J2EE). If enough interest is seen, I may just add it to the main driver, together with the adapter that converts asynchronous interface to the synchronous one...
Cheers,
-Mike
Andreas,
Connection adapters to synchronous api are available in the current development branch, see:
org.otfeed.support.alt.OutboundConnectionAdapter
and
org.otfeed.support.alt.OutboundConnectionFactoryAdapter
they do work, but befare, that naming/api can change in the future...
Cheers,
-Mike