Previously, committing a block would require as many as four syncs:
* A sync to the block file itself
* A sync to open the block index database
* A sync to commit the block index transaction (+1 for each subtxn)
* A sync to close the block index transaction
When this is performed for every block, and you have 60k+ to commit,
these syncs take a significant amount of time.
This patch queues blocks to be committed in a seperate thread, once
every second. This commit performs a single commit/sync at the very
end of the process, greatly reducing the amount of time spent syncing,
without compromising data integrity.
Signed-off-by: Bryan Donlan <bdonlan@...>
---
For your convenience, a web code-review page is also available here:
http://codereview.appspot.com/1820042
db.h | 7 +++-
main.cpp | 142 ++++++++++++++++++++++++++++++++++++++++++++++++++-----------
main.h | 14 ++----
net.cpp | 8 +++-
4 files changed, 134 insertions(+), 37 deletions(-)
diff --git a/db.h b/db.h
index 29ff199..68102ad 100644
--- a/db.h
+++ b/db.h
@@ -223,8 +223,13 @@ public:
return false;
if (vTxn.empty())
return false;
- int ret = vTxn.back()->commit(0);
+ DbTxn *txn = vTxn.back();
vTxn.pop_back();
+
+ // Only sync if this is the root transaction; for nested txns
+ // the root will handle the sync for us
+ int ret = txn->commit(vTxn.size() ? DB_TXN_NOSYNC : DB_TXN_SYNC);
+
return (ret == 0);
}
diff --git a/main.cpp b/main.cpp
index 803548e..f9fffb1 100644
--- a/main.cpp
+++ b/main.cpp
@@ -21,6 +21,7 @@ unsigned int nTransactionsUpdated = 0;
map<COutPoint, CInPoint> mapNextTx;
map<uint256, CBlockIndex*> mapBlockIndex;
+map<uint256, CBlock*> mapBlockPending;
const uint256 hashGenesisBlock("0x000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f");
CBlockIndex* pindexGenesisBlock = NULL;
int nBestHeight = -1;
@@ -1189,7 +1190,7 @@ bool Reorganize(CTxDB& txdb, CBlockIndex* pindexNew)
}
-bool CBlock::AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos)
+bool CBlock::AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos, CTxDB &txdb)
{
// Check for duplicate
uint256 hash = GetHash();
@@ -1209,7 +1210,6 @@ bool CBlock::AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos)
pindexNew->nHeight = pindexNew->pprev->nHeight + 1;
}
- CTxDB txdb;
txdb.TxnBegin();
txdb.WriteBlockIndex(CDiskBlockIndex(pindexNew));
@@ -1232,7 +1232,6 @@ bool CBlock::AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos)
delete pindexNew;
return error("AddToBlockIndex() : ConnectBlock failed");
}
- txdb.TxnCommit();
pindexNew->pprev->pnext = pindexNew;
// Delete redundant memory transactions
@@ -1259,7 +1258,6 @@ bool CBlock::AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos)
}
txdb.TxnCommit();
- txdb.Close();
if (pindexNew == pindexBest)
{
@@ -1322,6 +1320,9 @@ bool CBlock::AcceptBlock()
if (mapBlockIndex.count(hash))
return error("AcceptBlock() : block already in mapBlockIndex");
+ if (mapBlockPending.count(hash))
+ return error("AcceptBlock() : block already in mapBlockPending");
+
// Get prev block index
map<uint256, CBlockIndex*>::iterator mi = mapBlockIndex.find(hashPrevBlock);
if (mi == mapBlockIndex.end())
@@ -1341,6 +1342,16 @@ bool CBlock::AcceptBlock()
if (nBits != GetNextWorkRequired(pindexPrev))
return error("AcceptBlock() : incorrect proof of work");
+ mapBlockPending[hash] = this;
+ return true;
+}
+
+bool CBlock::CommitBlock(CTxDB &txdb)
+{
+ uint256 hash = GetHash();
+ assert(mapBlockPending.count(hash));
+ assert(mapBlockPending[hash] == this);
+
// Write block to history file
if (!CheckDiskSpace(::GetSerializeSize(*this, SER_DISK)))
return error("AcceptBlock() : out of disk space");
@@ -1348,7 +1359,7 @@ bool CBlock::AcceptBlock()
unsigned int nBlockPos;
if (!WriteToDisk(!fClient, nFile, nBlockPos))
return error("AcceptBlock() : WriteToDisk failed");
- if (!AddToBlockIndex(nFile, nBlockPos))
+ if (!AddToBlockIndex(nFile, nBlockPos, txdb))
return error("AcceptBlock() : AddToBlockIndex failed");
// Relay inventory, but don't relay old inventory during initial block download
@@ -1358,6 +1369,22 @@ bool CBlock::AcceptBlock()
if (nBestHeight > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : 55000))
pnode->PushInventory(CInv(MSG_BLOCK, hash));
+ // Recursively process any orphan blocks that depended on this one
+ uint256 hashPrev = GetHash();
+ for (multimap<uint256, CBlock*>::iterator mi = mapOrphanBlocksByPrev.lower_bound(hashPrev);
+ mi != mapOrphanBlocksByPrev.upper_bound(hashPrev);
+ ++mi)
+ {
+ CBlock* pblockOrphan = (*mi).second;
+ if (pblockOrphan->AcceptBlock()) {
+ cerr << "Queued orphan: " << pblockOrphan->GetHash().GetHex() << endl;
+ }
+ mapOrphanBlocks.erase(pblockOrphan->GetHash());
+ }
+ mapOrphanBlocksByPrev.erase(hashPrev);
+
+ delete this;
+
return true;
}
@@ -1396,26 +1423,6 @@ bool ProcessBlock(CNode* pfrom, CBlock* pblock)
delete pblock;
return error("ProcessBlock() : AcceptBlock FAILED");
}
- delete pblock;
-
- // Recursively process any orphan blocks that depended on this one
- vector<uint256> vWorkQueue;
- vWorkQueue.push_back(hash);
- for (int i = 0; i < vWorkQueue.size(); i++)
- {
- uint256 hashPrev = vWorkQueue[i];
- for (multimap<uint256, CBlock*>::iterator mi = mapOrphanBlocksByPrev.lower_bound(hashPrev);
- mi != mapOrphanBlocksByPrev.upper_bound(hashPrev);
- ++mi)
- {
- CBlock* pblockOrphan = (*mi).second;
- if (pblockOrphan->AcceptBlock())
- vWorkQueue.push_back(pblockOrphan->GetHash());
- mapOrphanBlocks.erase(pblockOrphan->GetHash());
- delete pblockOrphan;
- }
- mapOrphanBlocksByPrev.erase(hashPrev);
- }
printf("ProcessBlock: ACCEPTED\n");
return true;
@@ -1523,6 +1530,14 @@ FILE* AppendBlockFile(unsigned int& nFileRet)
nFileRet = nCurrentBlockFile;
return file;
}
+ // If we need to rotate to a new block file, the commit thread
+ // may not be able to sync the old file. So sync it now.
+#ifdef __WXMSW__
+ _commit(_fileno(file));
+#else
+ fsync(fileno(file));
+#endif
+
fclose(file);
nCurrentBlockFile++;
}
@@ -1597,8 +1612,13 @@ bool LoadBlockIndex(bool fAllowNew)
unsigned int nBlockPos;
if (!block.WriteToDisk(!fClient, nFile, nBlockPos))
return error("LoadBlockIndex() : writing genesis block to disk failed");
- if (!block.AddToBlockIndex(nFile, nBlockPos))
+
+ CTxDB txdb;
+
+ if (!block.AddToBlockIndex(nFile, nBlockPos, txdb))
return error("LoadBlockIndex() : genesis block not accepted");
+
+ txdb.Close();
}
return true;
@@ -1688,13 +1708,83 @@ void PrintBlockTree()
}
+//////////////////////////////////////////////////////////////////////////////
+//
+// Block writeback thread
+//
+
+void WritebackNow()
+{
+ CRITICAL_BLOCK(cs_main)
+ {
+ map<uint256, CBlock *>::iterator it;
+
+ if (!mapBlockPending.size())
+ return;
+
+ cerr << "WRITEBACK PENDING: " << mapBlockPending.size() << endl;
+
+ CTxDB txdb;
+ txdb.TxnBegin();
+
+ int writebackct = 0;
+ while (mapBlockPending.size()) {
+ it = mapBlockPending.begin();
+ cerr << "TRYING TO COMMIT " << it->first.GetHex() << " (block " << it->second->GetHash().GetHex() << ")\n";
+ it->second->CommitBlock(txdb);
+ mapBlockPending.erase(it);
+ writebackct++;
+ }
+ unsigned int unused;
+ FILE *fileout = AppendBlockFile(unused);
+ // Sync the new blocks to disk
+#ifdef __WXMSW__
+ _commit(_fileno(fileout));
+#else
+ fsync(fileno(fileout));
+#endif
+ fclose(fileout);
+ // Commit the db only once the blocks themselves are safely on-disk
+
+ txdb.TxnCommit();
+ txdb.Close();
+ }
+}
+void ThreadWriteback(void* parg)
+{
+ try
+ {
+ while (!fShutdown)
+ {
+ WritebackNow();
+ Sleep(1000);
+ }
+ vnThreadsRunning[5]--;
+ }
+ catch (std::exception& e) {
+ vnThreadsRunning[5]--;
+ PrintException(&e, "ThreadWriteback()");
+ } catch (...) {
+ vnThreadsRunning[5]--;
+ PrintException(NULL, "ThreadWriteback()");
+ }
+ printf("Threadwriteback exiting\n");
+}
+void StartWriteback()
+{
+ vnThreadsRunning[5]++;
+ if (!CreateThread(ThreadWriteback, NULL)) {
+ printf("FATAL: CreateThread(ThreadWriteback) failed!\n");
+ abort();
+ }
+}
//////////////////////////////////////////////////////////////////////////////
//
diff --git a/main.h b/main.h
index d8f257b..1e9f3c7 100644
--- a/main.h
+++ b/main.h
@@ -77,7 +77,10 @@ string SendMoneyToBitcoinAddress(string strAddress, int64 nValue, CWalletTx& wtx
void GenerateBitcoins(bool fGenerate);
void ThreadBitcoinMiner(void* parg);
void BitcoinMiner();
+bool ProcessBlock(CNode* pfrom, CBlock* pblock);
+void WritebackNow();
+void StartWriteback();
@@ -983,14 +986,6 @@ public:
return error("CBlock::WriteToDisk() : ftell failed");
fileout << *this;
- // Flush stdio buffers and commit to disk before returning
- fflush(fileout);
-#ifdef __WXMSW__
- _commit(_fileno(fileout));
-#else
- fsync(fileno(fileout));
-#endif
-
return true;
}
@@ -1044,9 +1039,10 @@ public:
bool DisconnectBlock(CTxDB& txdb, CBlockIndex* pindex);
bool ConnectBlock(CTxDB& txdb, CBlockIndex* pindex);
bool ReadFromDisk(const CBlockIndex* blockindex, bool fReadTransactions=true);
- bool AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos);
+ bool AddToBlockIndex(unsigned int nFile, unsigned int nBlockPos, CTxDB &txdb);
bool CheckBlock() const;
bool AcceptBlock();
+ bool CommitBlock(CTxDB &txdb);
};
diff --git a/net.cpp b/net.cpp
index dc1debd..701ac79 100644
--- a/net.cpp
+++ b/net.cpp
@@ -1320,6 +1320,9 @@ void StartNode(void* parg)
if (!CreateThread(ThreadMessageHandler, NULL))
printf("Error: CreateThread(ThreadMessageHandler) failed\n");
+ // Writeback blocks
+ StartWriteback();
+
// Generate coins in the background
GenerateBitcoins(fGenerateBitcoins);
@@ -1381,7 +1384,7 @@ bool StopNode()
fShutdown = true;
nTransactionsUpdated++;
int64 nStart = GetTime();
- while (vnThreadsRunning[0] > 0 || vnThreadsRunning[2] > 0 || vnThreadsRunning[3] > 0 || vnThreadsRunning[4] > 0)
+ while (vnThreadsRunning[0] > 0 || vnThreadsRunning[2] > 0 || vnThreadsRunning[3] > 0 || vnThreadsRunning[4] > 0 || vnThreadsRunning[5] > 0)
{
if (GetTime() - nStart > 20)
break;
@@ -1392,10 +1395,13 @@ bool StopNode()
if (vnThreadsRunning[2] > 0) printf("ThreadMessageHandler still running\n");
if (vnThreadsRunning[3] > 0) printf("ThreadBitcoinMiner still running\n");
if (vnThreadsRunning[4] > 0) printf("ThreadRPCServer still running\n");
+ if (vnThreadsRunning[5] > 0) printf("ThreadWriteback still running\n");
while (vnThreadsRunning[2] > 0 || vnThreadsRunning[4] > 0)
Sleep(20);
Sleep(50);
+ WritebackNow();
+
return true;
}
--
1.7.0.4
|