Thread: [Quickfix-developers] concurrent threads -- "External component has thrown an exception"
Brought to you by:
orenmnero
From: Rick L. <ric...@gm...> - 2008-06-25 15:35:10
|
Greetings, I have provided a slimmed-down version of the two threads that are running concurrently in my application in an attempt to see if anyone can pinpoint why I might be getting this ugly exception when grabbing integer values from QuickFix.Message objects. The first thread is my "producer" thread, and it basically converts FAST data into a valid FIX message, constructs a QuickFix.Message from this FIX string and then adds this Message object onto the "consumers" queue. The second (consumer) thread waits for new QuickFix.Message objects to be added to the queue, and then does the necessary processing. When the exception occurs, I print out the FIX message text in question to see if there is something errant about it -- I then, in a separate application, construct a QuickFix.Message object from the /same /FIX message string (that caused the original exception) and it works fine -- so it's definitely not a FIX-formatting issue. It must be a thread issue because the bug is very intermittent, some machines it happens every few minutes, while some machines it happens only once or twice a week. Please let me know if you see any glaring issues -- the same QuickFix.Message object is never accessed on more than one thread at a time, so it seems that there must be some static or shared component of QuickFix that is having concurrency issues when 2 threads are accessing it at the same time (meaning, I might have 2 QuickFix.Message objects, one on each thread, and each thread is performing actions on these objects individually -- which should be allowed, but if each of these distinct objects has some shared component, then I can see a problem arising). Thanks in advance! Rick private void runProducerThread() { while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { // raw FAST-decoded data will come in through this Queue ArrayList newMessages = new ArrayList(); lock (m_QueueMarketData.SyncRoot) { newMessages.AddRange(m_QueueMarketData); m_QueueMarketData.Clear(); } for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; // my MarketDataDecoder will convert the FAST-encoded data into a valid FIX message string string msgText = m_MarketDataDecoder.ProcessPacket(packet); // I create a QuickFix.Message object with this string and my DataDictionary QuickFix.Message message = new QuickFix.Message(msgText, m_DataDictionary); int msgSeqNum = message.getHeader().getInt(34);* * if (msgSeqNum = m_NextMarketDataSeqNum) { // The other (consumer) thread, which I'm adding this QuickFix.Message to will do the actual // processing of the *contents* of the message -- this thread simply ensures that we don't // miss any UDP packets by looking at the MsgSeqNum of each incoming packet. The other // thread, then, doesn't have to worry about this. lock (m_ConsumerQueue.SyncRoot) { m_ConsumerQueue.Enqueue(message); m_ConsumerSyncEvents.NewItemEvent.Set(); } m_NextMarketDataSeqNum = msgSeqNum + 1; } else { // reconcile missed messages... } } } } private void runConsumerThread() { // Wait for producer thread to add another QuickFix.Message to my Queue while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { ArrayList newMessages = new ArrayList(); lock (m_ConsumerQueue.SyncRoot) { newMessages.AddRange(m_ConsumerQueue); m_ConsumerQueue.Clear(); } for (int i = 0; i < newMessages.Count; i++) { // Perform necessary processing on this Message object QuickFix.Message message = (QuickFix.Message)newMessages[i]; int msgSeqNum = message.getHeader().getInt(34); int numEntries = message.getInt(268); // Each update will have multiple MDEntries for (uint i = 0; i < numEntries; i++) { QuickFix44.MarketDataIncrementalRefresh.NoMDEntries group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); message.getGroup(i + 1, group); string securityID = group.getSecurityID().getValue(); MDPInstrument inst = (MDPInstrument)m_ISINToInstrumentMap[securityID]; if (inst != null) { if (group.isSetField(83)) { int rptSeqNum = group.getField(new IntField(83)).getValue(); if (inst.lastMarketDataSeqNum != -1 && msgSeqNum > inst.lastMarketDataSeqNum) { // process inc refresh MDUpdateAction updateAction = group.getMDUpdateAction(); int entrySize = 0; if (group.isSetMDEntrySize()) { MDEntrySize mdEntrySize = group.getMDEntrySize(); entrySize = (int)mdEntrySize.getValue(); } double entryPrice = 0; MDEntryPx entryPx = group.getMDEntryPx(); entryPrice = entryPx.getValue(); MDEntryType entryType = group.getMDEntryType(); bool implied = false; QuoteCondition quoteCondition = null; if (group.isSetQuoteCondition()) { quoteCondition = group.getQuoteCondition(); if (group.isSetQuoteCondition()) { if (group.getQuoteCondition().getValue()[0] == 'K') { implied = true; } } } if (group.isSetField(1020)) { int tradeVolume = group.getField(new IntField(1020)).getValue(); inst.SetTradeVolume(tradeVolume); } if ((quoteCondition == null || quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { int priceLevel = -1; switch (entryType.getValue()) { case MDEntryType.BID: { // This is where the exception is occurring *priceLevel = group.getField(new IntField(1023)).getValue();** * switch (updateAction.getValue()) { case MDUpdateAction.NEW: // do necessary processing... break; case MDUpdateAction.DELETE: // do necessary processing... break; case MDUpdateAction.CHANGE: // do necessary processing... break; } } break; case MDEntryType.OFFER: { // This is also where the exception is occurring *priceLevel = group.getField(new IntField(1023)).getValue();** * switch (updateAction.getValue()) { case MDUpdateAction.NEW: // do necessary processing... break; case MDUpdateAction.DELETE: // do necessary processing... break; case MDUpdateAction.CHANGE: // do necessary processing... break; } } break; case MDEntryType.TRADE_VOLUME: // do necessary processing... break; case MDEntryType.TRADING_SESSION_LOW_PRICE: // do necessary processing... break; case MDEntryType.TRADING_SESSION_HIGH_PRICE: // do necessary processing... break; case MDEntryType.SETTLEMENT_PRICE: // do necessary processing... break; } } } } } } } } } |
From: Rick L. <ric...@gm...> - 2008-06-25 16:14:09
|
It is very /not /OK to have nested for-loops w/ the same counter variable -- just another side effect of my cut-paste job. Sorry for the confusion.... Rick John Haldi wrote: > I'm not a C# guy, but is it ok to have nested For loops with the same > counter variable? In VB this would be dangerous: > > For i = 0 to ..... > For i = 0 to ...... > > Next > Next > > ------------------------------------------------------------------------ > *From:* qui...@li... > [mailto:qui...@li...] *On Behalf > Of *Rick Lane > *Sent:* Wednesday, June 25, 2008 11:34 AM > *To:* qui...@li... > *Subject:* [Quickfix-developers] concurrent threads -- "External > component hasthrown an exception" > > Greetings, > > I have provided a slimmed-down version of the two threads that are > running concurrently in my application in an attempt to see if anyone > can pinpoint why I might be getting this ugly exception when grabbing > integer values from QuickFix.Message objects. > > The first thread is my "producer" thread, and it basically converts > FAST data into a valid FIX message, constructs a QuickFix.Message from > this FIX string and then adds this Message object onto the "consumers" > queue. > > The second (consumer) thread waits for new QuickFix.Message objects to > be added to the queue, and then does the necessary processing. > > When the exception occurs, I print out the FIX message text in > question to see if there is something errant about it -- I then, in a > separate application, construct a QuickFix.Message object from the > /same /FIX message string (that caused the original exception) and it > works fine -- so it's definitely not a FIX-formatting issue. It must > be a thread issue because the bug is very intermittent, some machines > it happens every few minutes, while some machines it happens only once > or twice a week. > > Please let me know if you see any glaring issues -- the same > QuickFix.Message object is never accessed on more than one thread at a > time, so it seems that there must be some static or shared component > of QuickFix that is having concurrency issues when 2 threads are > accessing it at the same time (meaning, I might have 2 > QuickFix.Message objects, one on each thread, and each thread is > performing actions on these objects individually -- which should be > allowed, but if each of these distinct objects has some shared > component, then I can see a problem arising). > > Thanks in advance! > > Rick > > private void runProducerThread() > { > while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { > // raw FAST-decoded data will come in through this Queue > ArrayList newMessages = new ArrayList(); > lock (m_QueueMarketData.SyncRoot) { > newMessages.AddRange(m_QueueMarketData); > m_QueueMarketData.Clear(); > } > > for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { > FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; > // my MarketDataDecoder will convert the FAST-encoded data > into a valid FIX message string > string msgText = m_MarketDataDecoder.ProcessPacket(packet); > // I create a QuickFix.Message object with this string and > my DataDictionary > QuickFix.Message message = new QuickFix.Message(msgText, > m_DataDictionary); > int msgSeqNum = message.getHeader().getInt(34);* * > > if (msgSeqNum = m_NextMarketDataSeqNum) { > // The other (consumer) thread, which I'm adding this > QuickFix.Message to will do the actual > // processing of the *contents* of the message -- this > thread simply ensures that we don't > // miss any UDP packets by looking at the MsgSeqNum of > each incoming packet. The other > // thread, then, doesn't have to worry about this. > lock (m_ConsumerQueue.SyncRoot) { > m_ConsumerQueue.Enqueue(message); > m_ConsumerSyncEvents.NewItemEvent.Set(); > } > m_NextMarketDataSeqNum = msgSeqNum + 1; > } else { > // reconcile missed messages... > } > } > } > } > private void runConsumerThread() > { > // Wait for producer thread to add another QuickFix.Message to my Queue > while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && > WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { > ArrayList newMessages = new ArrayList(); > > lock (m_ConsumerQueue.SyncRoot) { > newMessages.AddRange(m_ConsumerQueue); > m_ConsumerQueue.Clear(); > } > for (int i = 0; i < newMessages.Count; i++) { > // Perform necessary processing on this Message object > QuickFix.Message message = (QuickFix.Message)newMessages[i]; > int msgSeqNum = message.getHeader().getInt(34); > int numEntries = message.getInt(268); > // Each update will have multiple MDEntries > for (uint i = 0; i < numEntries; i++) { > QuickFix44.MarketDataIncrementalRefresh.NoMDEntries > group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); > message.getGroup(i + 1, group); > string securityID = group.getSecurityID().getValue(); > MDPInstrument inst = > (MDPInstrument)m_ISINToInstrumentMap[securityID]; > > if (inst != null) { > if (group.isSetField(83)) { > int rptSeqNum = group.getField(new > IntField(83)).getValue(); > if (inst.lastMarketDataSeqNum != -1 && > msgSeqNum > inst.lastMarketDataSeqNum) { > // process inc refresh > MDUpdateAction updateAction = > group.getMDUpdateAction(); > int entrySize = 0; > if (group.isSetMDEntrySize()) { > MDEntrySize mdEntrySize = > group.getMDEntrySize(); > entrySize = (int)mdEntrySize.getValue(); > } > double entryPrice = 0; > > MDEntryPx entryPx = group.getMDEntryPx(); > entryPrice = entryPx.getValue(); > > MDEntryType entryType = > group.getMDEntryType(); > bool implied = false; > > QuoteCondition quoteCondition = null; > if (group.isSetQuoteCondition()) { > quoteCondition = > group.getQuoteCondition(); > > if (group.isSetQuoteCondition()) { > if > (group.getQuoteCondition().getValue()[0] == 'K') { > implied = true; > } > } > } > if (group.isSetField(1020)) { > int tradeVolume = group.getField(new > IntField(1020)).getValue(); > inst.SetTradeVolume(tradeVolume); > } > > if ((quoteCondition == null || > quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { > int priceLevel = -1; > switch (entryType.getValue()) { > case MDEntryType.BID: { > // This is where the exception is occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.OFFER: { > // This is also where the exception is > occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.TRADE_VOLUME: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_LOW_PRICE: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_HIGH_PRICE: > // do necessary processing... > break; > case MDEntryType.SETTLEMENT_PRICE: > // do necessary processing... > break; > } > } > } > } > } > } > } > } > } > > No virus found in this incoming message. > Checked by AVG. > Version: 8.0.101 / Virus Database: 270.4.1/1518 - Release Date: > 6/25/2008 9:46 AM > |
From: Rick L. <ric...@gm...> - 2008-06-25 17:00:41
|
I should also mention the exception occurs at another line (in the same body of the same "consumer" loop), but I removed this line from my pseudo code below: qty = (int)group.getMDEntrySize().getValue(); This occasionally also throws the exception. Rick Lane wrote: > Greetings, > > I have provided a slimmed-down version of the two threads that are > running concurrently in my application in an attempt to see if anyone > can pinpoint why I might be getting this ugly exception when grabbing > integer values from QuickFix.Message objects. > > The first thread is my "producer" thread, and it basically converts > FAST data into a valid FIX message, constructs a QuickFix.Message from > this FIX string and then adds this Message object onto the "consumers" > queue. > > The second (consumer) thread waits for new QuickFix.Message objects to > be added to the queue, and then does the necessary processing. > > When the exception occurs, I print out the FIX message text in > question to see if there is something errant about it -- I then, in a > separate application, construct a QuickFix.Message object from the > /same /FIX message string (that caused the original exception) and it > works fine -- so it's definitely not a FIX-formatting issue. It must > be a thread issue because the bug is very intermittent, some machines > it happens every few minutes, while some machines it happens only once > or twice a week. > > Please let me know if you see any glaring issues -- the same > QuickFix.Message object is never accessed on more than one thread at a > time, so it seems that there must be some static or shared component > of QuickFix that is having concurrency issues when 2 threads are > accessing it at the same time (meaning, I might have 2 > QuickFix.Message objects, one on each thread, and each thread is > performing actions on these objects individually -- which should be > allowed, but if each of these distinct objects has some shared > component, then I can see a problem arising). > > Thanks in advance! > > Rick > > private void runProducerThread() > { > while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { > // raw FAST-decoded data will come in through this Queue > ArrayList newMessages = new ArrayList(); > lock (m_QueueMarketData.SyncRoot) { > newMessages.AddRange(m_QueueMarketData); > m_QueueMarketData.Clear(); > } > > for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { > FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; > // my MarketDataDecoder will convert the FAST-encoded data > into a valid FIX message string > string msgText = m_MarketDataDecoder.ProcessPacket(packet); > // I create a QuickFix.Message object with this string and > my DataDictionary > QuickFix.Message message = new QuickFix.Message(msgText, > m_DataDictionary); > int msgSeqNum = message.getHeader().getInt(34);* * > > if (msgSeqNum = m_NextMarketDataSeqNum) { > // The other (consumer) thread, which I'm adding this > QuickFix.Message to will do the actual > // processing of the *contents* of the message -- this > thread simply ensures that we don't > // miss any UDP packets by looking at the MsgSeqNum of > each incoming packet. The other > // thread, then, doesn't have to worry about this. > lock (m_ConsumerQueue.SyncRoot) { > m_ConsumerQueue.Enqueue(message); > m_ConsumerSyncEvents.NewItemEvent.Set(); > } > m_NextMarketDataSeqNum = msgSeqNum + 1; > } else { > // reconcile missed messages... > } > } > } > } > private void runConsumerThread() > { > // Wait for producer thread to add another QuickFix.Message to my Queue > while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && > WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { > ArrayList newMessages = new ArrayList(); > > lock (m_ConsumerQueue.SyncRoot) { > newMessages.AddRange(m_ConsumerQueue); > m_ConsumerQueue.Clear(); > } > for (int i = 0; i < newMessages.Count; i++) { > // Perform necessary processing on this Message object > QuickFix.Message message = (QuickFix.Message)newMessages[i]; > int msgSeqNum = message.getHeader().getInt(34); > int numEntries = message.getInt(268); > // Each update will have multiple MDEntries > for (uint i = 0; i < numEntries; i++) { > QuickFix44.MarketDataIncrementalRefresh.NoMDEntries > group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); > message.getGroup(i + 1, group); > string securityID = group.getSecurityID().getValue(); > MDPInstrument inst = > (MDPInstrument)m_ISINToInstrumentMap[securityID]; > > if (inst != null) { > if (group.isSetField(83)) { > int rptSeqNum = group.getField(new > IntField(83)).getValue(); > if (inst.lastMarketDataSeqNum != -1 && > msgSeqNum > inst.lastMarketDataSeqNum) { > // process inc refresh > MDUpdateAction updateAction = > group.getMDUpdateAction(); > int entrySize = 0; > if (group.isSetMDEntrySize()) { > MDEntrySize mdEntrySize = > group.getMDEntrySize(); > entrySize = (int)mdEntrySize.getValue(); > } > double entryPrice = 0; > > MDEntryPx entryPx = group.getMDEntryPx(); > entryPrice = entryPx.getValue(); > > MDEntryType entryType = > group.getMDEntryType(); > bool implied = false; > > QuoteCondition quoteCondition = null; > if (group.isSetQuoteCondition()) { > quoteCondition = > group.getQuoteCondition(); > > if (group.isSetQuoteCondition()) { > if > (group.getQuoteCondition().getValue()[0] == 'K') { > implied = true; > } > } > } > if (group.isSetField(1020)) { > int tradeVolume = group.getField(new > IntField(1020)).getValue(); > inst.SetTradeVolume(tradeVolume); > } > > if ((quoteCondition == null || > quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { > int priceLevel = -1; > switch (entryType.getValue()) { > case MDEntryType.BID: { > // This is where the exception is occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.OFFER: { > // This is also where the exception is > occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case > MDUpdateAction.DELETE: > // do necessary > processing... > break; > case > MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.TRADE_VOLUME: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_LOW_PRICE: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_HIGH_PRICE: > // do necessary processing... > break; > case MDEntryType.SETTLEMENT_PRICE: > // do necessary processing... > break; > } > } > } > } > } > } > } > } > } > |