Re: [Quickfix-developers] concurrent threads -- "External component has thrown an exception"
Brought to you by:
orenmnero
From: Rick L. <ric...@gm...> - 2008-06-25 17:00:41
|
I should also mention the exception occurs at another line (in the same body of the same "consumer" loop), but I removed this line from my pseudo code below: qty = (int)group.getMDEntrySize().getValue(); This occasionally also throws the exception. Rick Lane wrote: > Greetings, > > I have provided a slimmed-down version of the two threads that are > running concurrently in my application in an attempt to see if anyone > can pinpoint why I might be getting this ugly exception when grabbing > integer values from QuickFix.Message objects. > > The first thread is my "producer" thread, and it basically converts > FAST data into a valid FIX message, constructs a QuickFix.Message from > this FIX string and then adds this Message object onto the "consumers" > queue. > > The second (consumer) thread waits for new QuickFix.Message objects to > be added to the queue, and then does the necessary processing. > > When the exception occurs, I print out the FIX message text in > question to see if there is something errant about it -- I then, in a > separate application, construct a QuickFix.Message object from the > /same /FIX message string (that caused the original exception) and it > works fine -- so it's definitely not a FIX-formatting issue. It must > be a thread issue because the bug is very intermittent, some machines > it happens every few minutes, while some machines it happens only once > or twice a week. > > Please let me know if you see any glaring issues -- the same > QuickFix.Message object is never accessed on more than one thread at a > time, so it seems that there must be some static or shared component > of QuickFix that is having concurrency issues when 2 threads are > accessing it at the same time (meaning, I might have 2 > QuickFix.Message objects, one on each thread, and each thread is > performing actions on these objects individually -- which should be > allowed, but if each of these distinct objects has some shared > component, then I can see a problem arising). > > Thanks in advance! > > Rick > > private void runProducerThread() > { > while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { > // raw FAST-decoded data will come in through this Queue > ArrayList newMessages = new ArrayList(); > lock (m_QueueMarketData.SyncRoot) { > newMessages.AddRange(m_QueueMarketData); > m_QueueMarketData.Clear(); > } > > for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { > FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; > // my MarketDataDecoder will convert the FAST-encoded data > into a valid FIX message string > string msgText = m_MarketDataDecoder.ProcessPacket(packet); > // I create a QuickFix.Message object with this string and > my DataDictionary > QuickFix.Message message = new QuickFix.Message(msgText, > m_DataDictionary); > int msgSeqNum = message.getHeader().getInt(34);* * > > if (msgSeqNum = m_NextMarketDataSeqNum) { > // The other (consumer) thread, which I'm adding this > QuickFix.Message to will do the actual > // processing of the *contents* of the message -- this > thread simply ensures that we don't > // miss any UDP packets by looking at the MsgSeqNum of > each incoming packet. The other > // thread, then, doesn't have to worry about this. > lock (m_ConsumerQueue.SyncRoot) { > m_ConsumerQueue.Enqueue(message); > m_ConsumerSyncEvents.NewItemEvent.Set(); > } > m_NextMarketDataSeqNum = msgSeqNum + 1; > } else { > // reconcile missed messages... > } > } > } > } > private void runConsumerThread() > { > // Wait for producer thread to add another QuickFix.Message to my Queue > while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && > WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { > ArrayList newMessages = new ArrayList(); > > lock (m_ConsumerQueue.SyncRoot) { > newMessages.AddRange(m_ConsumerQueue); > m_ConsumerQueue.Clear(); > } > for (int i = 0; i < newMessages.Count; i++) { > // Perform necessary processing on this Message object > QuickFix.Message message = (QuickFix.Message)newMessages[i]; > int msgSeqNum = message.getHeader().getInt(34); > int numEntries = message.getInt(268); > // Each update will have multiple MDEntries > for (uint i = 0; i < numEntries; i++) { > QuickFix44.MarketDataIncrementalRefresh.NoMDEntries > group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); > message.getGroup(i + 1, group); > string securityID = group.getSecurityID().getValue(); > MDPInstrument inst = > (MDPInstrument)m_ISINToInstrumentMap[securityID]; > > if (inst != null) { > if (group.isSetField(83)) { > int rptSeqNum = group.getField(new > IntField(83)).getValue(); > if (inst.lastMarketDataSeqNum != -1 && > msgSeqNum > inst.lastMarketDataSeqNum) { > // process inc refresh > MDUpdateAction updateAction = > group.getMDUpdateAction(); > int entrySize = 0; > if (group.isSetMDEntrySize()) { > MDEntrySize mdEntrySize = > group.getMDEntrySize(); > entrySize = (int)mdEntrySize.getValue(); > } > double entryPrice = 0; > > MDEntryPx entryPx = group.getMDEntryPx(); > entryPrice = entryPx.getValue(); > > MDEntryType entryType = > group.getMDEntryType(); > bool implied = false; > > QuoteCondition quoteCondition = null; > if (group.isSetQuoteCondition()) { > quoteCondition = > group.getQuoteCondition(); > > if (group.isSetQuoteCondition()) { > if > (group.getQuoteCondition().getValue()[0] == 'K') { > implied = true; > } > } > } > if (group.isSetField(1020)) { > int tradeVolume = group.getField(new > IntField(1020)).getValue(); > inst.SetTradeVolume(tradeVolume); > } > > if ((quoteCondition == null || > quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { > int priceLevel = -1; > switch (entryType.getValue()) { > case MDEntryType.BID: { > // This is where the exception is occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.OFFER: { > // This is also where the exception is > occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.TRADE_VOLUME: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_LOW_PRICE: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_HIGH_PRICE: > // do necessary processing... > break; > case MDEntryType.SETTLEMENT_PRICE: > // do necessary processing... > break; > } > } > } > } > } > } > } > } > } > |