[Quickfix-developers] concurrent threads -- "External component has thrown an exception"
Brought to you by:
orenmnero
From: Rick L. <ric...@gm...> - 2008-06-25 15:35:10
|
Greetings, I have provided a slimmed-down version of the two threads that are running concurrently in my application in an attempt to see if anyone can pinpoint why I might be getting this ugly exception when grabbing integer values from QuickFix.Message objects. The first thread is my "producer" thread, and it basically converts FAST data into a valid FIX message, constructs a QuickFix.Message from this FIX string and then adds this Message object onto the "consumers" queue. The second (consumer) thread waits for new QuickFix.Message objects to be added to the queue, and then does the necessary processing. When the exception occurs, I print out the FIX message text in question to see if there is something errant about it -- I then, in a separate application, construct a QuickFix.Message object from the /same /FIX message string (that caused the original exception) and it works fine -- so it's definitely not a FIX-formatting issue. It must be a thread issue because the bug is very intermittent, some machines it happens every few minutes, while some machines it happens only once or twice a week. Please let me know if you see any glaring issues -- the same QuickFix.Message object is never accessed on more than one thread at a time, so it seems that there must be some static or shared component of QuickFix that is having concurrency issues when 2 threads are accessing it at the same time (meaning, I might have 2 QuickFix.Message objects, one on each thread, and each thread is performing actions on these objects individually -- which should be allowed, but if each of these distinct objects has some shared component, then I can see a problem arising). Thanks in advance! Rick private void runProducerThread() { while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { // raw FAST-decoded data will come in through this Queue ArrayList newMessages = new ArrayList(); lock (m_QueueMarketData.SyncRoot) { newMessages.AddRange(m_QueueMarketData); m_QueueMarketData.Clear(); } for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; // my MarketDataDecoder will convert the FAST-encoded data into a valid FIX message string string msgText = m_MarketDataDecoder.ProcessPacket(packet); // I create a QuickFix.Message object with this string and my DataDictionary QuickFix.Message message = new QuickFix.Message(msgText, m_DataDictionary); int msgSeqNum = message.getHeader().getInt(34);* * if (msgSeqNum = m_NextMarketDataSeqNum) { // The other (consumer) thread, which I'm adding this QuickFix.Message to will do the actual // processing of the *contents* of the message -- this thread simply ensures that we don't // miss any UDP packets by looking at the MsgSeqNum of each incoming packet. The other // thread, then, doesn't have to worry about this. lock (m_ConsumerQueue.SyncRoot) { m_ConsumerQueue.Enqueue(message); m_ConsumerSyncEvents.NewItemEvent.Set(); } m_NextMarketDataSeqNum = msgSeqNum + 1; } else { // reconcile missed messages... } } } } private void runConsumerThread() { // Wait for producer thread to add another QuickFix.Message to my Queue while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { ArrayList newMessages = new ArrayList(); lock (m_ConsumerQueue.SyncRoot) { newMessages.AddRange(m_ConsumerQueue); m_ConsumerQueue.Clear(); } for (int i = 0; i < newMessages.Count; i++) { // Perform necessary processing on this Message object QuickFix.Message message = (QuickFix.Message)newMessages[i]; int msgSeqNum = message.getHeader().getInt(34); int numEntries = message.getInt(268); // Each update will have multiple MDEntries for (uint i = 0; i < numEntries; i++) { QuickFix44.MarketDataIncrementalRefresh.NoMDEntries group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); message.getGroup(i + 1, group); string securityID = group.getSecurityID().getValue(); MDPInstrument inst = (MDPInstrument)m_ISINToInstrumentMap[securityID]; if (inst != null) { if (group.isSetField(83)) { int rptSeqNum = group.getField(new IntField(83)).getValue(); if (inst.lastMarketDataSeqNum != -1 && msgSeqNum > inst.lastMarketDataSeqNum) { // process inc refresh MDUpdateAction updateAction = group.getMDUpdateAction(); int entrySize = 0; if (group.isSetMDEntrySize()) { MDEntrySize mdEntrySize = group.getMDEntrySize(); entrySize = (int)mdEntrySize.getValue(); } double entryPrice = 0; MDEntryPx entryPx = group.getMDEntryPx(); entryPrice = entryPx.getValue(); MDEntryType entryType = group.getMDEntryType(); bool implied = false; QuoteCondition quoteCondition = null; if (group.isSetQuoteCondition()) { quoteCondition = group.getQuoteCondition(); if (group.isSetQuoteCondition()) { if (group.getQuoteCondition().getValue()[0] == 'K') { implied = true; } } } if (group.isSetField(1020)) { int tradeVolume = group.getField(new IntField(1020)).getValue(); inst.SetTradeVolume(tradeVolume); } if ((quoteCondition == null || quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { int priceLevel = -1; switch (entryType.getValue()) { case MDEntryType.BID: { // This is where the exception is occurring *priceLevel = group.getField(new IntField(1023)).getValue();** * switch (updateAction.getValue()) { case MDUpdateAction.NEW: // do necessary processing... break; case MDUpdateAction.DELETE: // do necessary processing... break; case MDUpdateAction.CHANGE: // do necessary processing... break; } } break; case MDEntryType.OFFER: { // This is also where the exception is occurring *priceLevel = group.getField(new IntField(1023)).getValue();** * switch (updateAction.getValue()) { case MDUpdateAction.NEW: // do necessary processing... break; case MDUpdateAction.DELETE: // do necessary processing... break; case MDUpdateAction.CHANGE: // do necessary processing... break; } } break; case MDEntryType.TRADE_VOLUME: // do necessary processing... break; case MDEntryType.TRADING_SESSION_LOW_PRICE: // do necessary processing... break; case MDEntryType.TRADING_SESSION_HIGH_PRICE: // do necessary processing... break; case MDEntryType.SETTLEMENT_PRICE: // do necessary processing... break; } } } } } } } } } |