Re: [Quickfix-developers] concurrent threads -- "External component hasthrown an exception"
Brought to you by:
orenmnero
From: Rick L. <ric...@gm...> - 2008-06-25 16:14:09
|
It is very /not /OK to have nested for-loops w/ the same counter variable -- just another side effect of my cut-paste job. Sorry for the confusion.... Rick John Haldi wrote: > I'm not a C# guy, but is it ok to have nested For loops with the same > counter variable? In VB this would be dangerous: > > For i = 0 to ..... > For i = 0 to ...... > > Next > Next > > ------------------------------------------------------------------------ > *From:* qui...@li... > [mailto:qui...@li...] *On Behalf > Of *Rick Lane > *Sent:* Wednesday, June 25, 2008 11:34 AM > *To:* qui...@li... > *Subject:* [Quickfix-developers] concurrent threads -- "External > component hasthrown an exception" > > Greetings, > > I have provided a slimmed-down version of the two threads that are > running concurrently in my application in an attempt to see if anyone > can pinpoint why I might be getting this ugly exception when grabbing > integer values from QuickFix.Message objects. > > The first thread is my "producer" thread, and it basically converts > FAST data into a valid FIX message, constructs a QuickFix.Message from > this FIX string and then adds this Message object onto the "consumers" > queue. > > The second (consumer) thread waits for new QuickFix.Message objects to > be added to the queue, and then does the necessary processing. > > When the exception occurs, I print out the FIX message text in > question to see if there is something errant about it -- I then, in a > separate application, construct a QuickFix.Message object from the > /same /FIX message string (that caused the original exception) and it > works fine -- so it's definitely not a FIX-formatting issue. It must > be a thread issue because the bug is very intermittent, some machines > it happens every few minutes, while some machines it happens only once > or twice a week. > > Please let me know if you see any glaring issues -- the same > QuickFix.Message object is never accessed on more than one thread at a > time, so it seems that there must be some static or shared component > of QuickFix that is having concurrency issues when 2 threads are > accessing it at the same time (meaning, I might have 2 > QuickFix.Message objects, one on each thread, and each thread is > performing actions on these objects individually -- which should be > allowed, but if each of these distinct objects has some shared > component, then I can see a problem arising). > > Thanks in advance! > > Rick > > private void runProducerThread() > { > while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { > // raw FAST-decoded data will come in through this Queue > ArrayList newMessages = new ArrayList(); > lock (m_QueueMarketData.SyncRoot) { > newMessages.AddRange(m_QueueMarketData); > m_QueueMarketData.Clear(); > } > > for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { > FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; > // my MarketDataDecoder will convert the FAST-encoded data > into a valid FIX message string > string msgText = m_MarketDataDecoder.ProcessPacket(packet); > // I create a QuickFix.Message object with this string and > my DataDictionary > QuickFix.Message message = new QuickFix.Message(msgText, > m_DataDictionary); > int msgSeqNum = message.getHeader().getInt(34);* * > > if (msgSeqNum = m_NextMarketDataSeqNum) { > // The other (consumer) thread, which I'm adding this > QuickFix.Message to will do the actual > // processing of the *contents* of the message -- this > thread simply ensures that we don't > // miss any UDP packets by looking at the MsgSeqNum of > each incoming packet. The other > // thread, then, doesn't have to worry about this. > lock (m_ConsumerQueue.SyncRoot) { > m_ConsumerQueue.Enqueue(message); > m_ConsumerSyncEvents.NewItemEvent.Set(); > } > m_NextMarketDataSeqNum = msgSeqNum + 1; > } else { > // reconcile missed messages... > } > } > } > } > private void runConsumerThread() > { > // Wait for producer thread to add another QuickFix.Message to my Queue > while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && > WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { > ArrayList newMessages = new ArrayList(); > > lock (m_ConsumerQueue.SyncRoot) { > newMessages.AddRange(m_ConsumerQueue); > m_ConsumerQueue.Clear(); > } > for (int i = 0; i < newMessages.Count; i++) { > // Perform necessary processing on this Message object > QuickFix.Message message = (QuickFix.Message)newMessages[i]; > int msgSeqNum = message.getHeader().getInt(34); > int numEntries = message.getInt(268); > // Each update will have multiple MDEntries > for (uint i = 0; i < numEntries; i++) { > QuickFix44.MarketDataIncrementalRefresh.NoMDEntries > group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); > message.getGroup(i + 1, group); > string securityID = group.getSecurityID().getValue(); > MDPInstrument inst = > (MDPInstrument)m_ISINToInstrumentMap[securityID]; > > if (inst != null) { > if (group.isSetField(83)) { > int rptSeqNum = group.getField(new > IntField(83)).getValue(); > if (inst.lastMarketDataSeqNum != -1 && > msgSeqNum > inst.lastMarketDataSeqNum) { > // process inc refresh > MDUpdateAction updateAction = > group.getMDUpdateAction(); > int entrySize = 0; > if (group.isSetMDEntrySize()) { > MDEntrySize mdEntrySize = > group.getMDEntrySize(); > entrySize = (int)mdEntrySize.getValue(); > } > double entryPrice = 0; > > MDEntryPx entryPx = group.getMDEntryPx(); > entryPrice = entryPx.getValue(); > > MDEntryType entryType = > group.getMDEntryType(); > bool implied = false; > > QuoteCondition quoteCondition = null; > if (group.isSetQuoteCondition()) { > quoteCondition = > group.getQuoteCondition(); > > if (group.isSetQuoteCondition()) { > if > (group.getQuoteCondition().getValue()[0] == 'K') { > implied = true; > } > } > } > if (group.isSetField(1020)) { > int tradeVolume = group.getField(new > IntField(1020)).getValue(); > inst.SetTradeVolume(tradeVolume); > } > > if ((quoteCondition == null || > quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { > int priceLevel = -1; > switch (entryType.getValue()) { > case MDEntryType.BID: { > // This is where the exception is occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.OFFER: { > // This is also where the exception is > occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.TRADE_VOLUME: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_LOW_PRICE: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_HIGH_PRICE: > // do necessary processing... > break; > case MDEntryType.SETTLEMENT_PRICE: > // do necessary processing... > break; > } > } > } > } > } > } > } > } > } > > No virus found in this incoming message. > Checked by AVG. > Version: 8.0.101 / Virus Database: 270.4.1/1518 - Release Date: > 6/25/2008 9:46 AM > |