Menu

Inversion of control for e.g. HistDataCsv

Help
Andreas
2008-06-26
2013-04-22
  • Andreas

    Andreas - 2008-06-26

    Hi Mike,
    I have a standalone Java app that has to request historical information.  One of it's functions initiates the calls and needs the results synchronously. Are there any otfeed built-in mechanisms for this synchronization?  I can run the HistDataCsv sample below just fine, but how can I access the incoming ohlc data from my calling function? Do you have any examples of such a scenario available?

    Thanks for the great work.

    HistDataCsv excerpt:

            HistDataCommand command = new HistDataCommand();
            command.setExchangeCode(args[0]);
            command.setSymbolCode(args[1]);
            command.setStartDate(start);
            command.setEndDate(end);
            command.setAggregationSpan(new AggregationSpan(units, interval));
            command.setDataDelegate(new IDataDelegate<OTOHLC>() {
                private boolean firstTime = true;
                public void onData(OTOHLC ohlc) {
                    if(firstTime) {
                        firstTime = false;
                        System.out.println(formatter.header(OTOHLC.class));
                    }
                    System.out.println(formatter.format(ohlc));

    //Mike: How do I get the ohlc data back in to the main thread?

                }

            });
            command.setCompletionDelegate(new ICompletionDelegate() {
                public void onDataEnd(OTError error) {
                    System.out.println("# Error: " + error);
                }
            });

            try {
                IRequest request = connection.prepareRequest(command);

                request.submit();

                request.waitForCompletion();
                System.out.println("Request completed!!");
            } finally {
                connection.shutdown();
                connection.waitForCompletion();
            }

     
    • Andreas

      Andreas - 2008-06-26

      The solution I found is below.
      Is it good? Are there better ways? Seems quite an overkill for simply requesting a data point.

      The client synchronizes over an object mr:
              MessageRelayer mr = new MessageRelayer();
              SynchronizedOHLCDataDelegate ohlcDataDelegate = new SynchronizedOHLCDataDelegate(mr);
              command.setDataDelegate(ohlcDataDelegate);
              command.setCompletionDelegate(ohlcDataDelegate);
             
              IRequest request = connection.prepareRequest(command);
              request.submit();
              request.waitForCompletion();
              Object returned = mr.get();

      The implementation I wrote for the (both) delegates:

      public class SynchronizedOHLCDataDelegate implements IDataDelegate<OTOHLC>, ICompletionDelegate {
          private static final Logger log = Logger.getLogger(SynchronizedOHLCDataDelegate.class);
          MessageRelayer messageRelayer;
         
          public SynchronizedOHLCDataDelegate(MessageRelayer messageRelayer) {
              this.messageRelayer = messageRelayer;
          }
         
          public void onData(OTOHLC ohlc) {
              log.debug("onData!");
              messageRelayer.put(ohlc);
          }
         
          public void onDataEnd(OTError error) {
              log.error("Error:" + error);
              if ((error != null) && (error.getCode() == 1003)) {
                  log.warn("No data");
                  messageRelayer.put("No data!");
              } else {
                  log.error("Warning: unknown error!");
              }
             
          }
         
      }

      And finally the plumbing: A producer/consumer implementation:

      public class MessageRelayer {
          private Object message;       // shared data
          private boolean available = false;

      // Method used by the consumer to access the shared data
          public synchronized Object get() {
              while (available == false) {
                  try {
                      wait();     // Consumer enters a wait state until notified by the Producer
                  } catch (InterruptedException e) { }
              }
              available = false;
              notifyAll();        // Consumer notifies Producer that it can store new contents
              return message;   
          }

      // Method used by the consumer to access (store) the shared data
          public synchronized void put(Object message) {
              while (available == true) {
                  try {
                      wait();     // Producer who wants to store contents enters
                      // a wait state until notified by the Consumer
                  } catch (InterruptedException e) { }
              }
              this.message = message;
              available = true;
              notifyAll();        // Producer notifies Consumer to come out
                      // of the wait state and consume the contents
          }
      }

       
    • Mike Kroutikov

      Mike Kroutikov - 2008-06-26

      Andreas, the idea is correct. Somewhat more compact solution would be to collect all data to the list before returning from the synchronous method using anonymous local classes:

      List<OHCL> requestHistData(String excahnge, String symbol, Date start, Date end) {
         final List<OHLC> out = new ArrayList<OHLC>();
         final AtomicReference<OTError> errorRef = new AtomicReference<OTError>();
         HistDataCommand command = new HistDataCommand();
         command.setSymbol(symbol);
         ... // other sets

         command.setCompletionDelegate(new ICompletionDelegate() {
            public void onDataEnd(OTError error) {
               errorRef.set(error);
            }
         });
         command.setDataDelegate(new IDataDelegate<OHLC>() {
            public void onData(OHLC data) {
               out.add(data);
            }
         ));

         IRequest request = connection.prepareRequest(command);
         request.submit();
         request.waitForCompletion();

         if(errorRef.get() != null) {
            throw new IOError(error.toString());
         }

         return out;
      }

      Note that above there are NO threading concerns (and atomic reference object is used as convenient reference holder, not for its atomicity!). This is because waitForCompletion() is thread-safe, and everything else happens while we are blocked...

      If you want to be able to receive data **before** last item arrives (i.e. as you go), then you need synchronized producer-consumer appoach:

      Iterable<OHLC> requestHistData(...) {
         final Queue<Object> queue = new BlockingQueue<Object>(); // fixme: check it is synchronized!
         final Object END_OF_STREAM = new Object();
         HistDataCommand = new HistDataCommand();
         ... // set parameters
         command.setCompletionDelegate(new ICompletionDelegate() {
            public void onDataEnd(OTError error) {
               if(error == null) {
                  queue.offer(END_OF_STREAM);
               } else {
                  queue.offer(error);
               }
            }
         });
         command.setDataDelegate(new IDataDelegate<OHLC>() {
            public void onData(OHLC data) {
               queue.offer(data);
            }
         ));

         IRequest request = connection.prepareRequest(command);
         request.submit();
         /// do NOT wait!

         return new Iterable<OHLC>() {
            public void Iterator<OHLC> iterator() {
               return new Iterator<OHLC>() {
                  private OHLC last = null;
                  public boolean hasNext() {
                     if(last != null) return true;
                     Object next = queue.poll();
                     if(next instanceof OHLC) {
                        last = (OHLC) next;
                        return true;
                     } else if(next instanceof OTError) {
                        throw new RuntimeException(next.toString());
                     } else {
                        // must be END_OF_STREAM
                        return false;
                     }
                  }
                  public OHLC next() {
                     if(hasNext()) {
                        OHLS out = last;
                        last = null;
                        return out;
                     } else {
                        return null;
                     }
                  }
                       
               }
            }
         };
      }

      Let me know if this works for you. Also, have a look at the synchronous interface to the opentick (in source mangement):
      http://otfeed.svn.sourceforge.net/viewvc/otfeed/trunk/otfeed-all/otfeed-j2ee/src/main/java/org/otfeed/j2ee/IOpenTickConnection.java?view=markup

      This is something that is emerging for container environment (J2EE). If enough interest is seen, I may just add it to the main driver, together with the adapter that converts asynchronous interface to the synchronous one...

      Cheers,
      -Mike

       
    • Mike Kroutikov

      Mike Kroutikov - 2008-06-30

      Andreas,

      Connection adapters to synchronous api are available in the current development branch, see:

      org.otfeed.support.alt.OutboundConnectionAdapter

      and

      org.otfeed.support.alt.OutboundConnectionFactoryAdapter

      they do work, but befare, that naming/api can change in the future...

      Cheers,
      -Mike

       

Log in to post a comment.

MongoDB Logo MongoDB