Thread: Re: [Quickfix-developers] concurrent threads -- "External component hasthrown an exception"
Brought to you by:
orenmnero
From: <or...@qu...> - 2008-06-25 15:51:16
|
Haven't completely gone through this, and not what you are looking for, but is this line correct? if (msgSeqNum = m_NextMarketDataSeqNum) { Did you mean this to be a comparison? > private void runProducerThread() > { > while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { > // raw FAST-decoded data will come in through this Queue > ArrayList newMessages = new ArrayList(); > lock (m_QueueMarketData.SyncRoot) { > newMessages.AddRange(m_QueueMarketData); > m_QueueMarketData.Clear(); > } > > for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { > FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; > // my MarketDataDecoder will convert the FAST-encoded data > into a valid FIX message string > string msgText = m_MarketDataDecoder.ProcessPacket(packet); > // I create a QuickFix.Message object with this string and > my DataDictionary > QuickFix.Message message = new QuickFix.Message(msgText, > m_DataDictionary); > int msgSeqNum = message.getHeader().getInt(34);* * > > if (msgSeqNum = m_NextMarketDataSeqNum) { > // The other (consumer) thread, which I'm adding this > QuickFix.Message to will do the actual > // processing of the *contents* of the message -- this > thread simply ensures that we don't > // miss any UDP packets by looking at the MsgSeqNum of > each incoming packet. The other > // thread, then, doesn't have to worry about this. > lock (m_ConsumerQueue.SyncRoot) { > m_ConsumerQueue.Enqueue(message); > m_ConsumerSyncEvents.NewItemEvent.Set(); > } > m_NextMarketDataSeqNum = msgSeqNum + 1; > } else { > // reconcile missed messages... > } > } > } > } > private void runConsumerThread() > { > // Wait for producer thread to add another QuickFix.Message to my Queue > while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && > WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { > ArrayList newMessages = new ArrayList(); > > lock (m_ConsumerQueue.SyncRoot) { > newMessages.AddRange(m_ConsumerQueue); > m_ConsumerQueue.Clear(); > } > for (int i = 0; i < newMessages.Count; i++) { > // Perform necessary processing on this Message object > QuickFix.Message message = (QuickFix.Message)newMessages[i]; > int msgSeqNum = message.getHeader().getInt(34); > int numEntries = message.getInt(268); > // Each update will have multiple MDEntries > for (uint i = 0; i < numEntries; i++) { > QuickFix44.MarketDataIncrementalRefresh.NoMDEntries > group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); > message.getGroup(i + 1, group); > string securityID = group.getSecurityID().getValue(); > MDPInstrument inst = > (MDPInstrument)m_ISINToInstrumentMap[securityID]; > > if (inst != null) { > if (group.isSetField(83)) { > int rptSeqNum = group.getField(new > IntField(83)).getValue(); > if (inst.lastMarketDataSeqNum != -1 && msgSeqNum > > inst.lastMarketDataSeqNum) { > // process inc refresh > MDUpdateAction updateAction = > group.getMDUpdateAction(); > int entrySize = 0; > if (group.isSetMDEntrySize()) { > MDEntrySize mdEntrySize = > group.getMDEntrySize(); > entrySize = (int)mdEntrySize.getValue(); > } > double entryPrice = 0; > > MDEntryPx entryPx = group.getMDEntryPx(); > entryPrice = entryPx.getValue(); > > MDEntryType entryType = group.getMDEntryType(); > bool implied = false; > > QuoteCondition quoteCondition = null; > if (group.isSetQuoteCondition()) { > quoteCondition = group.getQuoteCondition(); > > if (group.isSetQuoteCondition()) { > if > (group.getQuoteCondition().getValue()[0] == 'K') { > implied = true; > } > } > } > if (group.isSetField(1020)) { > int tradeVolume = group.getField(new > IntField(1020)).getValue(); > inst.SetTradeVolume(tradeVolume); > } > > if ((quoteCondition == null || > quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { > int priceLevel = -1; > switch (entryType.getValue()) { > case MDEntryType.BID: { > // This is where the exception is occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case MDUpdateAction.DELETE: > // do necessary > processing... > break; > case MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.OFFER: { > // This is also where the exception is > occurring > *priceLevel = > group.getField(new IntField(1023)).getValue();** * > switch > (updateAction.getValue()) { > case MDUpdateAction.NEW: > // do necessary > processing... > break; > case MDUpdateAction.DELETE: > // do necessary > processing... > break; > case MDUpdateAction.CHANGE: > // do necessary > processing... > break; > } > } > break; > case MDEntryType.TRADE_VOLUME: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_LOW_PRICE: > // do necessary processing... > break; > case > MDEntryType.TRADING_SESSION_HIGH_PRICE: > // do necessary processing... > break; > case MDEntryType.SETTLEMENT_PRICE: > // do necessary processing... > break; > } > } > } > } > } > } > } > } > }<hr>------------------------------------------------------------------------- > Check out the new SourceForge.net Marketplace. > It's the best place to buy or sell services for > just about anything Open Source. > http://sourceforge.net/services/buy/index.php<hr>_______________________________________________ > Quickfix-developers mailing list > Qui...@li... > https://lists.sourceforge.net/lists/listinfo/quickfix-developers |
From: Rick L. <ric...@gm...> - 2008-06-25 15:55:06
|
I re-wrote a lot of this in MS Word so I could preserve the color formatting of the code so it would be easier for people to read -- this will almost certainly not compile (and yes, in my code it's actually >=, but I removed a block of code, so I meant to replace it with ==). Thanks. Rick or...@qu... wrote: > Haven't completely gone through this, and not what you are looking for, > but is this line correct? > if (msgSeqNum = m_NextMarketDataSeqNum) { > > Did you mean this to be a comparison? > > >> private void runProducerThread() >> { >> while (WaitHandle.WaitAny(m_SyncEventsMarketData.EventArray) != 1) { >> // raw FAST-decoded data will come in through this Queue >> ArrayList newMessages = new ArrayList(); >> lock (m_QueueMarketData.SyncRoot) { >> newMessages.AddRange(m_QueueMarketData); >> m_QueueMarketData.Clear(); >> } >> >> for (int msgNum = 0; msgNum < newMessages.Count; msgNum++) { >> FASTDataPacket packet = (FASTDataPacket)newMessages[msgNum]; >> // my MarketDataDecoder will convert the FAST-encoded data >> into a valid FIX message string >> string msgText = m_MarketDataDecoder.ProcessPacket(packet); >> // I create a QuickFix.Message object with this string and >> my DataDictionary >> QuickFix.Message message = new QuickFix.Message(msgText, >> m_DataDictionary); >> int msgSeqNum = message.getHeader().getInt(34);* * >> >> if (msgSeqNum = m_NextMarketDataSeqNum) { >> // The other (consumer) thread, which I'm adding this >> QuickFix.Message to will do the actual >> // processing of the *contents* of the message -- this >> thread simply ensures that we don't >> // miss any UDP packets by looking at the MsgSeqNum of >> each incoming packet. The other >> // thread, then, doesn't have to worry about this. >> lock (m_ConsumerQueue.SyncRoot) { >> m_ConsumerQueue.Enqueue(message); >> m_ConsumerSyncEvents.NewItemEvent.Set(); >> } >> m_NextMarketDataSeqNum = msgSeqNum + 1; >> } else { >> // reconcile missed messages... >> } >> } >> } >> } >> private void runConsumerThread() >> { >> // Wait for producer thread to add another QuickFix.Message to my Queue >> while (!m_ConsumerSyncEvents.ExitThreadEvent.WaitOne(0, false) && >> WaitHandle.WaitAny(m_ConsumerSyncEvents.EventArray) != 1) { >> ArrayList newMessages = new ArrayList(); >> >> lock (m_ConsumerQueue.SyncRoot) { >> newMessages.AddRange(m_ConsumerQueue); >> m_ConsumerQueue.Clear(); >> } >> for (int i = 0; i < newMessages.Count; i++) { >> // Perform necessary processing on this Message object >> QuickFix.Message message = (QuickFix.Message)newMessages[i]; >> int msgSeqNum = message.getHeader().getInt(34); >> int numEntries = message.getInt(268); >> // Each update will have multiple MDEntries >> for (uint i = 0; i < numEntries; i++) { >> QuickFix44.MarketDataIncrementalRefresh.NoMDEntries >> group = new QuickFix44.MarketDataIncrementalRefresh.NoMDEntries(); >> message.getGroup(i + 1, group); >> string securityID = group.getSecurityID().getValue(); >> MDPInstrument inst = >> (MDPInstrument)m_ISINToInstrumentMap[securityID]; >> >> if (inst != null) { >> if (group.isSetField(83)) { >> int rptSeqNum = group.getField(new >> IntField(83)).getValue(); >> if (inst.lastMarketDataSeqNum != -1 && msgSeqNum >> > inst.lastMarketDataSeqNum) { >> // process inc refresh >> MDUpdateAction updateAction = >> group.getMDUpdateAction(); >> int entrySize = 0; >> if (group.isSetMDEntrySize()) { >> MDEntrySize mdEntrySize = >> group.getMDEntrySize(); >> entrySize = (int)mdEntrySize.getValue(); >> } >> double entryPrice = 0; >> >> MDEntryPx entryPx = group.getMDEntryPx(); >> entryPrice = entryPx.getValue(); >> >> MDEntryType entryType = group.getMDEntryType(); >> bool implied = false; >> >> QuoteCondition quoteCondition = null; >> if (group.isSetQuoteCondition()) { >> quoteCondition = group.getQuoteCondition(); >> >> if (group.isSetQuoteCondition()) { >> if >> (group.getQuoteCondition().getValue()[0] == 'K') { >> implied = true; >> } >> } >> } >> if (group.isSetField(1020)) { >> int tradeVolume = group.getField(new >> IntField(1020)).getValue(); >> inst.SetTradeVolume(tradeVolume); >> } >> >> if ((quoteCondition == null || >> quoteCondition.getValue()[0] != QuoteCondition.EXCHANGE_BEST)) { >> int priceLevel = -1; >> switch (entryType.getValue()) { >> case MDEntryType.BID: { >> // This is where the exception is occurring >> *priceLevel = >> group.getField(new IntField(1023)).getValue();** * >> switch >> (updateAction.getValue()) { >> case MDUpdateAction.NEW: >> // do necessary >> processing... >> break; >> case MDUpdateAction.DELETE: >> // do necessary >> processing... >> break; >> case MDUpdateAction.CHANGE: >> // do necessary >> processing... >> break; >> } >> } >> break; >> case MDEntryType.OFFER: { >> // This is also where the exception is >> occurring >> *priceLevel = >> group.getField(new IntField(1023)).getValue();** * >> switch >> (updateAction.getValue()) { >> case MDUpdateAction.NEW: >> // do necessary >> processing... >> break; >> case MDUpdateAction.DELETE: >> // do necessary >> processing... >> break; >> case MDUpdateAction.CHANGE: >> // do necessary >> processing... >> break; >> } >> } >> break; >> case MDEntryType.TRADE_VOLUME: >> // do necessary processing... >> break; >> case >> MDEntryType.TRADING_SESSION_LOW_PRICE: >> // do necessary processing... >> break; >> case >> MDEntryType.TRADING_SESSION_HIGH_PRICE: >> // do necessary processing... >> break; >> case MDEntryType.SETTLEMENT_PRICE: >> // do necessary processing... >> break; >> } >> } >> } >> } >> } >> } >> } >> } >> }<hr>------------------------------------------------------------------------- >> Check out the new SourceForge.net Marketplace. >> It's the best place to buy or sell services for >> just about anything Open Source. >> http://sourceforge.net/services/buy/index.php<hr>_______________________________________________ >> Quickfix-developers mailing list >> Qui...@li... >> https://lists.sourceforge.net/lists/listinfo/quickfix-developers >> > > |