Re: [Quickfix-developers] concurrent threads -- "External component hasthrown an exception"
Brought to you by:
orenmnero
From: <or...@qu...> - 2008-06-25 15:51:16
|
Haven't completely gone through this, and not what you are looking for, but is this line correct? if (msgSeqNum = m_NextMarketDataSeqNum) { Did you mean this to be a comparison? > private void runProducerThread() > { > while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { > // raw FAST-decoded data will come in through this Queue > ArrayList newMessages = new ArrayList(); > lock (m_QueueMarketData.SyncRoot) { > newMessages.AddRange(m_QueueMarketData); > m_QueueMarketData.Clear(); > } > > for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { > FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; > // my MarketDataDecoder will convert the FAST-encoded data > into a valid FIX message string > string msgText = m_MarketDataDecoder.ProcessPacket(packet); > // I create a QuickFix.Message object with this string and > my DataDictionary > QuickFix.Message message = new QuickFix.Message(msgText, > m_DataDictionary); > int msgSeqNum = message.getHeader().getInt(34);* * > > if (msgSeqNum = m_NextMarketDataSeqNum) { > // The other (consumer) thread, which I'm adding this > QuickFix.Message to will do the actual > // processing of the *contents* of the message -- this > thread simply ensures that we don't > // miss any UDP packets by looking at the MsgSeqNum of > each incoming packet. The other > // thread, then, doesn't have to worry about this. > lock (m_ConsumerQueue.SyncRoot) { > m_ConsumerQueue.Enqueue(message); > m_ConsumerSyncEvents.NewItemEvent.Set(); > } > m_NextMarketDataSeqNum = msgSeqNum + 1; > } else { > // reconcile missed messages... > } > } > } > } > private void runConsumerThread() > { > // Wait for producer thread to add another QuickFix.Message to my Queue > while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && > WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { > ArrayList newMessages = new ArrayList(); > > lock (m_ConsumerQueue.SyncRoot) { > newMessages.AddRange(m_ConsumerQueue); > m_ConsumerQueue.Clear(); > } > for (int i = 0; i < newMessages.Count; i++) { > // Perform necessary processing on this Message object > QuickFix.Message message = (QuickFix.Message)newMessages[i]; > int msgSeqNum = message.getHeader().getInt(34); > int numEntries = message.getInt(268); > // Each update will have multiple MDEntries > for (uint i = 0; i < numEntries; i++) { > QuickFix44.MarketDataIncrementalRefresh.NoMDEntries > group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); > message.getGroup(i + 1, group); > string securityID = group.getSecurityID().getValue(); > MDPInstrument inst = > (MDPInstrument)m_ISINToInstrumentMap[securityID]; > > if (inst != null) { > if (group.isSetField(83)) { > int rptSeqNum = group.getField(new > IntField(83)).getValue(); > if (inst.lastMarketDataSeqNum != -1 && msgSeqNum > > inst.lastMarketDataSeqNum) { > // process inc refresh > MDUpdateAction updateAction = > group.getMDUpdateAction(); > int entrySize = 0; > if (group.isSetMDEntrySize()) { > MDEntrySize mdEntrySize = > group.getMDEntrySize(); > entrySize = (int)mdEntrySize.getValue(); > } > double entryPrice = 0; > > MDEntryPx entryPx = group.getMDEntryPx(); > entryPrice = entryPx.getValue(); > > MDEntryType entryType = group.getMDEntryType(); > bool implied = false; > > QuoteCondition quoteCondition = null; > if (group.isSetQuoteCondition()) { > quoteCondition = group.getQuoteCondition(); > > if (group.isSetQuoteCondition()) { > if > (group.getQuoteCondition().getValue()[0] == 'K') { > implied = true; > } > } > } > if (group.isSetField(1020)) { > int tradeVolume = group.getField(new > IntField(1020)).getValue(); > inst.SetTradeVolume(tradeVolume); > } > > if ((quoteCondition == null || > quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { > int priceLevel = -1; > switch (entryType.getValue()) { > case MDEntryType.BID: { > // This is where the exception is occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case MDUpdateAction.DELETE: > // do necessary > processing... > break; > case MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.OFFER: { > // This is also where the exception is > occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case MDUpdateAction.DELETE: > // do necessary > processing... > break; > case MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.TRADE_VOLUME: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_LOW_PRICE: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_HIGH_PRICE: > // do necessary processing... > break; > case MDEntryType.SETTLEMENT_PRICE: > // do necessary processing... > break; > } > } > } > } > } > } > } > } > }<hr>------------------------------------------------------------------------- > Check out the new SourceForge.net Marketplace. > It's the best place to buy or sell services for > just about anything Open Source. > http://sourceforge.net/services/buy/index.php<hr>_______________________________________________ > Quickfix-developers mailing list > Qui...@li... > https://lists.sourceforge.net/lists/listinfo/quickfix-developers |