From: <th...@us...> - 2010-10-06 16:11:54
|
Revision: 1959 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1959&view=rev Author: the8472 Date: 2010-10-06 16:11:47 +0000 (Wed, 06 Oct 2010) Log Message: ----------- -more deadlock avoidance -forgot to write out .torrents from incoming connections ~~ Modified Paths: -------------- mldht/branches/indexer/DHTIndexer.jar mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java Modified: mldht/branches/indexer/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-10-05 23:26:10 UTC (rev 1958) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-10-06 16:11:47 UTC (rev 1959) @@ -158,7 +158,7 @@ } }, 20, 3, TimeUnit.SECONDS); - DHTIndexer.indexerScheduler.scheduleWithFixedDelay(new Runnable() { + Runnable finisher = new Runnable() { public void run() { try { @@ -168,7 +168,10 @@ DHT.log(e, LogLevel.Error); } } - }, 10*1000, 100, TimeUnit.MILLISECONDS); + }; + + DHTIndexer.indexerScheduler.scheduleWithFixedDelay(finisher, 10*1000, 100, TimeUnit.MILLISECONDS); + DHTIndexer.indexerScheduler.scheduleWithFixedDelay(finisher, 10*1000, 100, TimeUnit.MILLISECONDS); } @@ -188,36 +191,23 @@ if(conn.isState(PullMetaDataConnection.STATE_METADATA_VERIFIED)) { - // offload DB stuff from the networking thread + // offload IO/DB stuff from the networking thread DHTIndexer.indexerScheduler.submit(new Runnable() { public void run() { Session s = HibernateUtil.getSessionFactory().openSession(); try { - while(true) - { - Transaction tx = s.beginTransaction(); - try { - // create a fake task - TorrentDBEntry entry = (TorrentDBEntry) s.createCriteria(TorrentDBEntry.class).add(Restrictions.eq("info_hash", conn.infoHash)).uniqueResult(); - FetchTask task = new FetchTask(); - task.entry = entry; - task.hash = new Key(entry.info_hash).toString(false); - - // and immediately finish it. we don't use the queue to avoid deadlocks - SessionRunnable r = fetchTaskTerminated(entry, 2, false); - r.session = s; - r.run(); - - tx.commit(); - break; - } catch (Exception ex) - { - DHT.log(ex, LogLevel.Error); - tx.rollback(); - s.clear(); - continue; - } - } + + TorrentDBEntry entry = (TorrentDBEntry) s.createCriteria(TorrentDBEntry.class).add(Restrictions.eq("info_hash", conn.infoHash)).uniqueResult(); + FetchTask task = new FetchTask(); + task.entry = entry; + task.hash = new Key(entry.info_hash).toString(false); + + // and immediately finish it. we don't use the queue to avoid deadlocks + writeTorrentFile(conn, task); + fetchTaskTerminated(entry, 2); + } catch (IOException e) + { + DHT.log(e, LogLevel.Error); } finally { s.close(); } @@ -359,7 +349,7 @@ break; } - session.flush(); + //session.flush(); } @@ -417,7 +407,7 @@ log("added metadata task based on DHT for "+task.hash); } else { log("found no DHT entires for "+task.hash); - fetchTaskTerminated(task.entry, 0,true); + fetchTaskTerminated(task.entry, 0); } } } @@ -505,7 +495,7 @@ fetchMetadata(task); } else { log("failed metadata connection and finished for "+task.hash); - fetchTaskTerminated(task.entry, 0,true); + fetchTaskTerminated(task.entry, 0); digestTaskQueue(); } } else @@ -513,12 +503,12 @@ try { writeTorrentFile(conn, task); - fetchTaskTerminated(task.entry, 2,true); + fetchTaskTerminated(task.entry, 2); } catch (IOException e) { e.printStackTrace(); log("successful metadata connection but failed to store for "+task.hash); - fetchTaskTerminated(task.entry, 0,true); + fetchTaskTerminated(task.entry, 0); } } @@ -537,17 +527,17 @@ } - private SessionRunnable fetchTaskTerminated(final TorrentDBEntry e, final int newStatus, boolean enqueue) { + private void fetchTaskTerminated(final TorrentDBEntry e, final int newStatus) { - SessionRunnable r = new SessionRunnable() { + toFinish.add(new SessionRunnable() { public void run() { TorrentDBEntry entry = e; - // handle concurrent deletes - try { + + try { session.refresh(entry); } catch(ObjectNotFoundException ex) - { + { // handle concurrent deletes session.save(entry); } catch(NonUniqueObjectException ex) { @@ -571,12 +561,8 @@ entry.status = newStatus; - long now = System.currentTimeMillis()/1000; - entry.lastFetchAttempt = now; - - // we try those with fewer failures first so we have to remember which ones failed entry.fetchAttemptCount++; @@ -609,12 +595,7 @@ log("torrent done for "+k.toString(false)+" | new status "+newStatus); } - }; - - if(enqueue) - toFinish.add(r); - - return r; + }); } void processFinished() { @@ -639,8 +620,10 @@ } catch (HibernateException e) { tx.rollback(); - // try again + // try again and randomize in case of deadlocks + Collections.shuffle(processed); toFinish.addAll(processed); + } finally { s.close(); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |