From: <th...@us...> - 2010-07-14 17:59:08
|
Revision: 1894 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1894&view=rev Author: the8472 Date: 2010-07-14 17:59:02 +0000 (Wed, 14 Jul 2010) Log Message: ----------- ok, cursors didn't work out. let's try something different Modified Paths: -------------- mldht/branches/indexer/DHTIndexer.jar mldht/branches/indexer/lbms/plugins/mldht/indexer/InfoHashGatherer.java Modified: mldht/branches/indexer/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/InfoHashGatherer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/InfoHashGatherer.java 2010-07-14 14:37:24 UTC (rev 1893) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/InfoHashGatherer.java 2010-07-14 17:59:02 UTC (rev 1894) @@ -21,11 +21,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; -import org.gudy.azureus2.core3.util.LightHashSet; import org.hibernate.*; +import org.hibernate.criterion.Projection; +import org.hibernate.criterion.Projections; import org.hibernate.criterion.Restrictions; -import lbms.plugins.mldht.kad.DHT; import lbms.plugins.mldht.kad.DHTIndexingListener; import lbms.plugins.mldht.kad.Key; @@ -36,7 +36,7 @@ ConcurrentLinkedQueue<Key> handoffQueue = new ConcurrentLinkedQueue<Key>(); boolean running = true; - private static final int MAX_CHARGE = 800; + private static final int MAX_CHARGE = 150; private static final int MAX_BUFFER_SIZE = 80000; @@ -77,35 +77,36 @@ { - Set<String> hashes = new HashSet<String>(); + Set<String> hashesToInsert = new HashSet<String>(); for(Key k : keysToDump) - hashes.add(k.toString(false)); + hashesToInsert.add(k.toString(false)); - ScrollableResults foundEntries = session.createCriteria(TorrentDBEntry.class) - .add(Restrictions.in("info_hash",hashes)) - .setFetchSize(30) - .scroll(ScrollMode.FORWARD_ONLY); + List<String> existingHashes = session.createCriteria(TorrentDBEntry.class) + .add(Restrictions.in("info_hash",hashesToInsert)) + .setProjection(Projections.property("info_hash")) + .setFetchSize(hashesToInsert.size()) + .list(); - while(foundEntries.next()) - { - TorrentDBEntry entry = (TorrentDBEntry) foundEntries.get(0); - entry.hitCount++; - session.update(entry); - hashes.remove(entry.info_hash); - } + hashesToInsert.removeAll(existingHashes); - session.flush(); - - for(String hash : hashes) + for(String hash : hashesToInsert) { TorrentDBEntry entry = new TorrentDBEntry(); entry.info_hash = hash; entry.added = System.currentTimeMillis()/1000; - entry.hitCount = 1; + entry.hitCount = 0; //System.out.println("adding entry for "+entry.info_hash+" updated:"+entry.added); entry.status = 0; session.save(entry); } + + session.flush(); + + existingHashes.addAll(hashesToInsert); + + session.createQuery("update ihdata e set e.hitCount = e.hitCount+1 where e.info_hash in (:hashes)") + .setParameterList("hashes", existingHashes) + .executeUpdate(); tx.commit(); } catch (Exception e) @@ -132,12 +133,16 @@ public InfoHashGatherer() { - DHTIndexer.indexerScheduler.scheduleWithFixedDelay(new Runnable() { + Runnable hashesToDB = new Runnable() { public void run() { processInfohashes(); } - }, 1000, 500, TimeUnit.MILLISECONDS); + }; + // run this thing twice for parallelism + DHTIndexer.indexerScheduler.scheduleWithFixedDelay(hashesToDB, 1000, 500, TimeUnit.MILLISECONDS); + DHTIndexer.indexerScheduler.scheduleWithFixedDelay(hashesToDB, 1000, 500, TimeUnit.MILLISECONDS); + DHTIndexer.indexerScheduler.scheduleWithFixedDelay(new Runnable() { public void run() { processQueue(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |