From: <th...@us...> - 2010-08-20 07:13:24
|
Revision: 1923 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1923&view=rev Author: the8472 Date: 2010-08-20 07:13:18 +0000 (Fri, 20 Aug 2010) Log Message: ----------- coalesce state modification transactions to cut down on the number of commits Modified Paths: -------------- mldht/branches/indexer/DHTIndexer.jar mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java Modified: mldht/branches/indexer/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-08-17 22:01:38 UTC (rev 1922) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-08-20 07:13:18 UTC (rev 1923) @@ -151,6 +151,18 @@ } } }, 20, 3, TimeUnit.SECONDS); + + DHTIndexer.indexerScheduler.scheduleWithFixedDelay(new Runnable() { + public void run() { + try + { + processFinished(); + } catch (Exception e) + { + DHT.log(e, LogLevel.Error); + } + } + }, 10*1000, 100, TimeUnit.MILLISECONDS); } private void updatePivots() { @@ -400,62 +412,86 @@ return task != null; } + + private abstract class SessionRunnable implements Runnable { + + Session session; + } + + Queue<SessionRunnable> toFinish = new ConcurrentLinkedQueue<MetaDataGatherer.SessionRunnable>(); + private void torrentDone(final TorrentDBEntry entry, final int newStatus) { - final Session session = HibernateUtil.getSessionFactory().openSession(); - - DHTIndexer.indexerScheduler.execute(new Runnable() { + toFinish.add(new SessionRunnable() { public void run() { - - Transaction tx = session.beginTransaction(); - try + // handle concurrent deletes + try { + session.load(entry,entry.id); + } catch(ObjectNotFoundException ex) { - // handle concurrent deletes - try { - session.load(entry,entry.id); - } catch(ObjectNotFoundException ex) - { - session.save(entry); - } + session.save(entry); + } - entry.status = newStatus; + entry.status = newStatus; - long now = System.currentTimeMillis()/1000; + long now = System.currentTimeMillis()/1000; - entry.lastFetchAttempt = now; + entry.lastFetchAttempt = now; - // we try those with fewer failures first so we have to remember which ones failed - entry.fetchAttemptCount++; + // we try those with fewer failures first so we have to remember which ones failed + entry.fetchAttemptCount++; - if(newStatus == 0) - entry.hitCount /= 2; + if(newStatus == 0) + entry.hitCount /= 2; - /* + /* // remove entries that failed too often (hit count gets reduced on failure) if(entry.hitCount <= 0) session.delete(entry); else session.update(entry); - */ - session.update(entry); + */ + session.update(entry); - tx.commit(); - log("torrent done for "+entry.info_hash+" | new status "+newStatus); - session.close(); - } catch (HibernateException e) - { - tx.rollback(); - session.clear(); - // re-queue to allow other tasks to do their job too. will reuse session - DHTIndexer.indexerScheduler.execute(this); - } + log("torrent done for "+entry.info_hash+" | new status "+newStatus); } }); } + private static final int MAX_CHARGE = 100; + void processFinished() { + Session s = HibernateUtil.getSessionFactory().openSession(); + Transaction tx = s.beginTransaction(); + + ArrayList<SessionRunnable> processed = new ArrayList<SessionRunnable>(); + try + { + + int i = 0; + SessionRunnable r = null; + while((r = toFinish.poll()) != null && i < MAX_CHARGE) + { + i++; + r.session = s; + r.run(); + processed.add(r); + } + + tx.commit(); + } catch (HibernateException e) + { + tx.rollback(); + // try again + toFinish.addAll(processed); + } finally { + s.close(); + } + } + + } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |