Re: [Quickfix-developers] concurrent threads -- "External component hasthrown an exception"
Brought to you by:
orenmnero
From: Rick L. <ric...@gm...> - 2008-06-25 15:55:06
|
I re-wrote a lot of this in MS Word so I could preserve the color formatting of the code so it would be easier for people to read -- this will almost certainly not compile (and yes, in my code it's actually >=, but I removed a block of code, so I meant to replace it with ==). Thanks. Rick or...@qu... wrote: > Haven't completely gone through this, and not what you are looking for, > but is this line correct? > if (msgSeqNum = m_NextMarketDataSeqNum) { > > Did you mean this to be a comparison? > > >> private void runProducerThread() >> { >> while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { >> // raw FAST-decoded data will come in through this Queue >> ArrayList newMessages = new ArrayList(); >> lock (m_QueueMarketData.SyncRoot) { >> newMessages.AddRange(m_QueueMarketData); >> m_QueueMarketData.Clear(); >> } >> >> for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { >> FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; >> // my MarketDataDecoder will convert the FAST-encoded data >> into a valid FIX message string >> string msgText = m_MarketDataDecoder.ProcessPacket(packet); >> // I create a QuickFix.Message object with this string and >> my DataDictionary >> QuickFix.Message message = new QuickFix.Message(msgText, >> m_DataDictionary); >> int msgSeqNum = message.getHeader().getInt(34);* * >> >> if (msgSeqNum = m_NextMarketDataSeqNum) { >> // The other (consumer) thread, which I'm adding this >> QuickFix.Message to will do the actual >> // processing of the *contents* of the message -- this >> thread simply ensures that we don't >> // miss any UDP packets by looking at the MsgSeqNum of >> each incoming packet. The other >> // thread, then, doesn't have to worry about this. >> lock (m_ConsumerQueue.SyncRoot) { >> m_ConsumerQueue.Enqueue(message); >> m_ConsumerSyncEvents.NewItemEvent.Set(); >> } >> m_NextMarketDataSeqNum = msgSeqNum + 1; >> } else { >> // reconcile missed messages... >> } >> } >> } >> } >> private void runConsumerThread() >> { >> // Wait for producer thread to add another QuickFix.Message to my Queue >> while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && >> WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { >> ArrayList newMessages = new ArrayList(); >> >> lock (m_ConsumerQueue.SyncRoot) { >> newMessages.AddRange(m_ConsumerQueue); >> m_ConsumerQueue.Clear(); >> } >> for (int i = 0; i < newMessages.Count; i++) { >> // Perform necessary processing on this Message object >> QuickFix.Message message = (QuickFix.Message)newMessages[i]; >> int msgSeqNum = message.getHeader().getInt(34); >> int numEntries = message.getInt(268); >> // Each update will have multiple MDEntries >> for (uint i = 0; i < numEntries; i++) { >> QuickFix44.MarketDataIncrementalRefresh.NoMDEntries >> group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); >> message.getGroup(i + 1, group); >> string securityID = group.getSecurityID().getValue(); >> MDPInstrument inst = >> (MDPInstrument)m_ISINToInstrumentMap[securityID]; >> >> if (inst != null) { >> if (group.isSetField(83)) { >> int rptSeqNum = group.getField(new >> IntField(83)).getValue(); >> if (inst.lastMarketDataSeqNum != -1 && msgSeqNum >> > inst.lastMarketDataSeqNum) { >> // process inc refresh >> MDUpdateAction updateAction = >> group.getMDUpdateAction(); >> int entrySize = 0; >> if (group.isSetMDEntrySize()) { >> MDEntrySize mdEntrySize = >> group.getMDEntrySize(); >> entrySize = (int)mdEntrySize.getValue(); >> } >> double entryPrice = 0; >> >> MDEntryPx entryPx = group.getMDEntryPx(); >> entryPrice = entryPx.getValue(); >> >> MDEntryType entryType = group.getMDEntryType(); >> bool implied = false; >> >> QuoteCondition quoteCondition = null; >> if (group.isSetQuoteCondition()) { >> quoteCondition = group.getQuoteCondition(); >> >> if (group.isSetQuoteCondition()) { >> if >> (group.getQuoteCondition().getValue()[0] == 'K') { >> implied = true; >> } >> } >> } >> if (group.isSetField(1020)) { >> int tradeVolume = group.getField(new >> IntField(1020)).getValue(); >> inst.SetTradeVolume(tradeVolume); >> } >> >> if ((quoteCondition == null || >> quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { >> int priceLevel = -1; >> switch (entryType.getValue()) { >> case MDEntryType.BID: { >> // This is where the exception is occurring >> *priceLevel = >> group.getField(new IntField(1023)).getValue();** * >> switch >> (updateAction.getValue()) { >> case MDUpdateAction.NEW: >> // do necessary >> processing... >> break; >> case MDUpdateAction.DELETE: >> // do necessary >> processing... >> break; >> case MDUpdateAction.CHANGE: >> // do necessary >> processing... >> break; >> } >> } >> break; >> case MDEntryType.OFFER: { >> // This is also where the exception is >> occurring >> *priceLevel = >> group.getField(new IntField(1023)).getValue();** * >> switch >> (updateAction.getValue()) { >> case MDUpdateAction.NEW: >> // do necessary >> processing... >> break; >> case MDUpdateAction.DELETE: >> // do necessary >> processing... >> break; >> case MDUpdateAction.CHANGE: >> // do necessary >> processing... >> break; >> } >> } >> break; >> case MDEntryType.TRADE_VOLUME: >> // do necessary processing... >> break; >> case >> MDEntryType.TRADING_SESSION_LOW_PRICE: >> // do necessary processing... >> break; >> case >> MDEntryType.TRADING_SESSION_HIGH_PRICE: >> // do necessary processing... >> break; >> case MDEntryType.SETTLEMENT_PRICE: >> // do necessary processing... >> break; >> } >> } >> } >> } >> } >> } >> } >> } >> }<hr>------------------------------------------------------------------------- >> Check out the new SourceForge.net Marketplace. >> It's the best place to buy or sell services for >> just about anything Open Source. >> http://sourceforge.net/services/buy/index.php<hr>_______________________________________________ >> Quickfix-developers mailing list >> Qui...@li... >> https://lists.sourceforge.net/lists/listinfo/quickfix-developers >> > > |