From: <th...@us...> - 2010-10-05 18:23:55
|
Revision: 1954 http://azsmrc.svn.sourceforge.net/azsmrc/?rev=1954&view=rev Author: the8472 Date: 2010-10-05 18:23:48 +0000 (Tue, 05 Oct 2010) Log Message: ----------- isolate torrent updates from incoming connections into a separate transaction to reduce deadlocks Modified Paths: -------------- mldht/branches/indexer/DHTIndexer.jar mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java Modified: mldht/branches/indexer/DHTIndexer.jar =================================================================== (Binary files differ) Modified: mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java =================================================================== --- mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-10-05 09:07:01 UTC (rev 1953) +++ mldht/branches/indexer/lbms/plugins/mldht/indexer/MetaDataGatherer.java 2010-10-05 18:23:48 UTC (rev 1954) @@ -192,21 +192,36 @@ DHTIndexer.indexerScheduler.submit(new Runnable() { public void run() { Session s = HibernateUtil.getSessionFactory().openSession(); - TorrentDBEntry e = (TorrentDBEntry) s.createCriteria(TorrentDBEntry.class).add(Restrictions.eq("info_hash", conn.infoHash)).uniqueResult(); - if(e.status == 0) - { - FetchTask task = new FetchTask(); - task.entry = e; - task.hash = new Key(e.info_hash).toString(false); - try + try { + while(true) { - writeTorrentFile(conn, task); - } catch (IOException e1) - { - DHT.log(e1, LogLevel.Error); + Transaction tx = s.beginTransaction(); + TorrentDBEntry entry = (TorrentDBEntry) s.createCriteria(TorrentDBEntry.class).add(Restrictions.eq("info_hash", conn.infoHash)).uniqueResult(); + if(entry.status < 2) + { + FetchTask task = new FetchTask(); + task.entry = entry; + task.hash = new Key(entry.info_hash).toString(false); + try + { + writeTorrentFile(conn, task); + + entry.status = 2; + entry.fetchAttemptCount++; + entry.lastFetchAttempt = System.currentTimeMillis()/1000; + s.update(entry); + tx.commit(); + break; + } catch (Exception e) + { + tx.rollback(); + DHT.log(e, LogLevel.Error); + } + } } + } finally { + s.close(); } - s.close(); } }); } @@ -403,7 +418,7 @@ log("added metadata task based on DHT for "+task.hash); } else { log("found no DHT entires for "+task.hash); - torrentDone(task.entry, 0); + fetchTaskTerminated(task.entry, 0); } } } @@ -468,7 +483,6 @@ raf.close(); log("successful metadata connection for "+task.hash); - torrentDone(task.entry, 2); } private void fetchMetadata(final FetchTask task) { @@ -492,7 +506,7 @@ fetchMetadata(task); } else { log("failed metadata connection and finished for "+task.hash); - torrentDone(task.entry, 0); + fetchTaskTerminated(task.entry, 0); digestTaskQueue(); } } else @@ -500,11 +514,12 @@ try { writeTorrentFile(conn, task); + fetchTaskTerminated(task.entry, 2); } catch (IOException e) { e.printStackTrace(); log("successful metadata connection but failed to store for "+task.hash); - torrentDone(task.entry, 0); + fetchTaskTerminated(task.entry, 0); } } @@ -523,7 +538,7 @@ } - private void torrentDone(final TorrentDBEntry e, final int newStatus) { + private void fetchTaskTerminated(final TorrentDBEntry e, final int newStatus) { toFinish.add(new SessionRunnable() { public void run() { @@ -535,17 +550,18 @@ } catch(ObjectNotFoundException ex) { session.save(entry); - } catch(NonUniqueObjectException ex) - { - entry = (TorrentDBEntry) session.get(TorrentDBEntry.class, entry.id); - } + } Key k = new Key(entry.info_hash); // we already finished this one and another lookup was done in parallel. ignore. if(entry.status > 1) { - info.incomingCanidates.remove(k); + synchronized (info.incomingCanidates) + { + info.incomingCanidates.remove(k); + + } return; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |