Menu

Protecting Connector

Help
mattlf
2008-06-16
2013-04-22
  • mattlf

    mattlf - 2008-06-16

    Hi Mike

    Do i need to protect the connect mechanism when multiple thread access the connect function at the same time ?

    Should i do something like the following code/pseudocode:

    Thank you

    IConnect connect(){
        if(null == connection && state == SessionState.Disconnected)
            {
                synchronized (this)
                {
               
                    if(null == connection && state == SessionState.Disconnected)
    {

        OTConnectionFactory factory = new OTConnectionFactory();

                        SessionConnectionStateListener listener = new SessionConnectionStateListener(this/*,latch*/);

                        connection= factory.connect(listener);

    }
    }
    }
    }
    return connection;
    }

     
    • Mike Kroutikov

      Mike Kroutikov - 2008-06-16

      First, the code snippet does not make sense to me, as you are creating connection factory per connection. Instead, use a single connection factory (prefferably as an interface - IConnectionFactory - as you may want to play with different implementations).

      Method "connect()" of the connection factory is thread-safe, so no need to synchronize it.

      Maybe you can give more details on what you are trying to achieve? Specifically, what are your requirements with respect to connection state? Are you trying just monitor it, or do you want to do things like automatic reconnect?

      -Mike

       
    • mattlf

      mattlf - 2008-06-17

      Mike
      Thank you for your esponse
      Here is an example of a possible Session implementation around your IConnection. Can you have a quick look and tell me if i use it the wrong way?
      I have a function public OHLCQuote getOHLC(Ticker,Exchange, Date) that many threads can query at the same time. The first call to getOHLCQuote will initialize the connection.
      Thank you

          public OHLCQuote getOHLCQuote(String ticker, String exchange, Date date)
          throws
          InvalidConnectionException
          {
              IConnection connection = otSession.getConnection();
              if(connection == null)
              {
                  throw new InvalidConnectionException("connection is null");
              }

      @ThreadSafe
      public class MySession
      {

          private Logger log = Logger.getLogger(OTSession.class);

          @GuardedBy("this") private IConnection connection = null;
          @GuardedBy("this") private volatile SessionState state = SessionState.Disconnected;

          protected Properties properties;

          OTConnectionFactory factory = new OTConnectionFactory();

          public Properties getProperties() {
              return properties;
          }

          public void setProperties(Properties properties) {
              this.properties = properties;
          }
          public void setSessionState(SessionState state)
          {
              synchronized (this)
              {
                  this.state = state;
              }
          }

          public boolean isLogedIn()
          {
              if (state == SessionState.LogedIn)
              {
                  return true;
              }
              else
              {
                  return false;
              }
          }

          public void shutdown()
          {
              synchronized (this)
              {
                  connection.shutdown();
                  connection.waitForCompletion(1000);
                  state = SessionState.Disconnected;
                  connection = null;
              }
          }

          public IConnection getConnection()
          throws
          InterruptedException
          {
              if(null == connection && state == SessionState.Disconnected)
              {
                  synchronized (this)
                  {
                      if(null == connection && state == SessionState.Disconnected)
                      {
                          factory.getHostList().add(new OTHost(properties.getProperty("session.hostUrl"), Integer.valueOf(properties.getProperty("session.hostPort"))));
                          factory.setUsername(properties.getProperty("session.login"));
                          factory.setPassword(properties.getProperty("session.password"));

                          SessionConnectionStateListener listener = new SessionConnectionStateListener(this);

                          connection= factory.connect(listener);
                      }
                  }
              }
              return connection;
          }
      }

       
      • Mike Kroutikov

        Mike Kroutikov - 2008-06-17

        Matt, the code looks good. In fact, you have to worry only about two members: connection and state.
        Both use synchronized access, so you should be good.

        Two minor things: (1) shutdown() method if called twice, will get NPE at the second call. (2) its better to use interface for the factory, and inject it (this shifts the responsibility for configuring implementation to the caller).

        Next step will be sending actual request for the data. This gonna be another source of threading troubles, as data arrives on a different thread... Not sure what your model is: are you going to block until all data arrives? Or you are using asynchronous message handling? In the latter case you may find "IConnection.runInControlThread" useful...

        Last thing: if the only functionality that you want is the lazy connect, a better approach would be to create an adapter on top of IConnectionFactory... Something like:

        public class LazyConnectionFactory implements IConnectionFactory {
           private final IConnectionFactory engine;

           public LazyConnectionFactory(IConnectionFactory e) { engine = e; }

           private final Object lock = new Object();
           private IConnection trueConnection = null; // protected by lock

           private final IConnection lazyConnection = new IConnection() {

              public IRequest prepareRequest(ICommand command) {
                 synchronized(lock) {
                    if(trueConnection == null) {
                       trueConnection = engine.connect(null);
                    }
                 }

                 return trueConnection.prepareRequest(command);
              }

              public void shutdown() {
                 synchronized(lock) {
                    if(trueConnection != null) {
                       trueConnection.shutdown();
                    }
                 }
              }

              // etc.
           }

           public IConnection connect(final IConnectionStateListener listener) {
              // fixme: does not dispatch state events!!!
              return lazyConnection;
           }
        }

        Lemme know how it goes...

        -Mike

         
    • mattlf

      mattlf - 2008-06-17

      Mike

      Thank you for your response

      Bu I just wanted to reuse your driver and the synchronous request/response

      My concern is: Is OTConnectionFactory threadsafe?

      Thank you

      matt

       
    • Mike Kroutikov

      Mike Kroutikov - 2008-06-17

      Yes, all implementations of IConnectionFactory and IConnection are thread-safe:
      http://otfeed.sourceforge.net/apidocs/org/otfeed/IConnection.html
      http://otfeed.sourceforge.net/apidocs/org/otfeed/IConnectionFactory.html
      http://otfeed.sourceforge.net/apidocs/org/otfeed/IRequest.html

      If all you need is a synchronous request/response in a muti-threaded environment, something like this should work fine (also, have a look at samples in the source repository). The coding pain is due to the asynchronous nature of
      the api and the need to convert it to synchronous style, which the code below acheves via local anonymous classes:

      class Session {
         private final IConnection connection;

         public session(IConnectionFactory factory) {
            // this may be enhanced to listen for the connection state and update Session's status.
            connection = factory.connect(null);
         }

         public OTOHLCBundle readOTOHLC(InstrumentSpec instr, Date start, Date end) {
            HistDataCommand cmd = new HistDataCommand();
            // configure parameters
            cmd.setExchange(...);
            ...
            // configure receiver of the data events
            final List<OHLC> list = new LinkedList<OHLC>();
            cmd.setDataDelegate(new IDataDelegate<OHLC>() {
               public onData(OHLC data) { list.add(data); }
            });
            // configure receiver of the error events
            // note that AtomicReference is used **not** for thread-safety, but as a convenient reference holder
            final AtomicReference<OTError> errorRef = new AtomicReference<OTError>();
            cmd.setCompletionDelegate(new ICompletionDelegate() {
               public void onDataEnd(OTError error) { errorRef.set(error); }
            });
            
            IRequest request = connection.prepareRequest(cmd);
            request.submit();
            request.waitForCompletion();

            // now when request is complete, examine the result
            if(errorRef.get() != null) {
               // deal with error
               throw new SomeException(errorRef.get());
            }

            return new OTOHLCBundle(list);
         }

      Cheers,
      -Mike

       
    • mattlf

      mattlf - 2008-06-17

      Mike

      Thanks for your explanation

      But if 2 threads call factory.connect at the same time. Is there a guaranty that you will return the same connection for both of them?

      Matt

       
      • Mike Kroutikov

        Mike Kroutikov - 2008-06-17

        Matt, If you are after connection sharing, you need to use OTPooledConnectionFactory.

        OTConnectionFactory will make a separate TCP connection for each connect().

        OTPooledConnectionFactory will make a single TCP connection. It uses refcounting internally, so that each client can deal with connection as if it is its own. For example, each client can call shutdown(). When all connection objects originated by polled factory are shutdown, the TCP connection to opentick server will be closed.

        -Mike

         

Log in to post a comment.

MongoDB Logo MongoDB