[cvs] bogofilter/src datastore_db.c,1.93.2.2,1.93.2.3
Fast Bayesian spam filter along lines suggested by Paul Graham
Brought to you by:
m-a
From: <m-...@us...> - 2004-03-17 15:27:08
|
Update of /cvsroot/bogofilter/bogofilter/src In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv30058 Modified Files: Tag: branch-db-txn datastore_db.c Log Message: Final set of preliminary implementation. All check pass now. There are still things left to do, code restructuring, cleanups, making callers retry transactions that were jammed by the deadlock detector - see comments inside for details. Also, debug mode is not consistent. Index: datastore_db.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore_db.c,v retrieving revision 1.93.2.2 retrieving revision 1.93.2.3 diff -u -d -r1.93.2.2 -r1.93.2.3 --- datastore_db.c 17 Mar 2004 04:37:40 -0000 1.93.2.2 +++ datastore_db.c 17 Mar 2004 15:17:23 -0000 1.93.2.3 @@ -7,7 +7,7 @@ AUTHORS: Gyepi Sam <gy...@pr...> 2002 - 2003 -Matthias Andree <mat...@gm...> 2003 +Matthias Andree <mat...@gm...> 2003 - 2004 ******************************************************************************/ @@ -17,15 +17,12 @@ ** 3. Bogofilter's header files */ -/* BUGS: db_open is not part of a transaction - * only tested on db 4.0 so far */ +/* This code has been tested with BerkeleyDB 3.2, 3.3, 4.0, 4.1 and 4.2. + * Matthias Andree, 2004-03-17 */ /* TODO: - * - make sure that db_open can be part of a transaction * - implement proper retry when our transaction is aborted after a * deadlock - * - track down and fix a DB_RUNRECOVERY fault with subsequent lockup - * in t.bulkmode (no ideas yet what's wrong here) * - document code changes * - conduct massive tests * - check if we really need the log files for "catastrophic recovery" @@ -91,6 +88,9 @@ if (flags & DB_EXCL) flags &= ~DB_EXCL, strlcat(buf, "DB_EXCL ", sizeof(buf)); if (flags & DB_NOMMAP) flags &= ~DB_NOMMAP, strlcat(buf, "DB_NOMMAP ", sizeof(buf)); if (flags & DB_RDONLY) flags &= ~DB_RDONLY, strlcat(buf, "DB_RDONLY ", sizeof(buf)); +#if DB_AT_LEAST(4,1) + if (flags & DB_AUTO_COMMIT) flags &= ~DB_AUTO_COMMIT, strlcat(buf, "DB_AUTO_COMMIT ", sizeof(buf)); +#endif snprintf(b2, sizeof(b2), "%#lx", (unsigned long)flags); if (flags) strlcat(buf, b2, sizeof(buf)); return buf; @@ -112,7 +112,7 @@ file, database, type, flags, mode); if (DEBUG_DATABASE(1) || getenv("BF_DEBUG_DB_OPEN")) - fprintf(dbgout, "[pid %lu] DB->open(db=%p, file=%s, database=%s, type=%x, flags=%#lx=%s, mode=%#o) -> %d %s\n", (unsigned long)getpid(), db, file, database ? database : "NIL", type, (unsigned long)flags, resolveflags(flags), mode, ret, db_strerror(ret)); + fprintf(dbgout, "DB->open(db=%p, file=%s, database=%s, type=%x, flags=%#lx=%s, mode=%#o) -> %d %s\n", db, file, database ? database : "NIL", type, (unsigned long)flags, resolveflags(flags), mode, ret, db_strerror(ret)); return ret; } @@ -125,7 +125,7 @@ handle = xmalloc(sizeof(dbh_t)); memset(handle, 0, sizeof(dbh_t)); /* valgrind */ - handle->fd = -1; /* for lock */ + handle->fd = -1; handle->path = xstrdup(path); handle->name = xmalloc(len); @@ -158,6 +158,9 @@ if (!version_ok) { version_ok = 1; (void)db_version(&maj, &min, NULL); + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "db_version: Header version %d.%d, library version %d.%d\n", + DB_VERSION_MAJOR, DB_VERSION_MINOR, maj, min); if (!(maj == DB_VERSION_MAJOR && min == DB_VERSION_MINOR)) { fprintf(stderr, "The DB versions do not match.\n" "This program was compiled for DB version %d.%d,\n" @@ -213,10 +216,12 @@ ret = dbp->stat(dbp, &dbstat, DB_FAST_STAT); if (ret) { - dbp->err (dbp, ret, "%s (db) DB->stat", progname); + print_error(__FILE__, __LINE__, "(db) DB->stat"); return 0xffffffff; } pagesize = dbstat->bt_pagesize; + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->stat success, pagesize: %lu\n", (unsigned long)pagesize); free(dbstat); return pagesize; } @@ -326,20 +331,26 @@ handle->is_swapped = is_swapped ? true : false; if (ret != 0) { - dbp->err (dbp, ret, "%s (db) DB->get_byteswapped: %s", - progname, handle->name); + print_error(__FILE__, __LINE__, "(db) DB->get_byteswapped: %s", + db_strerror(ret)); db_close(handle, false); return NULL; /* handle already freed, ok to return */ } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->get_byteswapped: %s\n", is_swapped ? "true" : "false"); + ret = dbp->fd(dbp, &handle->fd); if (ret != 0) { - dbp->err (dbp, ret, "%s (db) DB->fd: %s", - progname, handle->name); + print_error(__FILE__, __LINE__, "(db) DB->fd: %s", + db_strerror(ret)); db_close(handle, false); return NULL; /* handle already freed, ok to return */ } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->fd: %d\n", handle->fd); + /* query page size */ pagesize = get_psize(dbp); if (pagesize == 0xffffffff) { @@ -373,45 +384,68 @@ return NULL; } +#if DB_AT_LEAST(4,0) +#define BF_LOG_FLUSH(e, i) ((e)->log_flush((e), (i))) +#define BF_TXN_BEGIN(e, f, g, h) ((e)->txn_begin((e), (f), (g), (h))) +#define BF_TXN_ID(t) ((t)->id(t)) +#define BF_TXN_ABORT(t) ((t)->abort((t))) +#define BF_TXN_COMMIT(t, f) ((t)->commit((t), (f))) +#else +#define BF_LOG_FLUSH(e, i) (log_flush((e), (i))) +#define BF_TXN_BEGIN(e, f, g, h) (txn_begin((e), (f), (g), (h))) +#define BF_TXN_ID(t) (txn_id(t)) +#define BF_TXN_ABORT(t) (txn_abort((t))) +#define BF_TXN_COMMIT(t, f) (txn_commit((t), (f))) +#endif + int db_txn_begin(dsh_t *dsh) { DB_TXN *t; int ret; assert(dbe); - if (((dbh_t *)dsh->dbh)->txn) return 0; - else { - ret = dbe->txn_begin(dbe, NULL, &t, 0); - if (ret) { - print_error(__FILE__, __LINE__, "(db) txn_begin, err: %s", - db_strerror(ret)); - } + assert(dsh); + assert(dsh->dbh); + + ret = BF_TXN_BEGIN(dbe, NULL, &t, 0); + if (ret) { + print_error(__FILE__, __LINE__, "DB_ENV->txn_begin(%p), err: %s", + dbe, db_strerror(ret)); + return ret; } ((dbh_t *)dsh->dbh)->txn = t; if (DEBUG_DATABASE(1)) - fprintf(dbgout, "db_txn_begin, id: %lx\n", - (unsigned long)t->id(t)); + fprintf(dbgout, "DB_ENV->txn_begin(%p), tid: %lx\n", + dbe, (unsigned long)BF_TXN_ID(t)); - return ret; + return 0; } int db_txn_abort(dsh_t *dsh) { int ret; - assert(dbe); - assert(dsh->dbh); DB_TXN *t = ((dbh_t *)dsh->dbh)->txn; + assert(dbe); assert(t); - if (DEBUG_DATABASE(1)) - fprintf(dbgout, "db_txn_abort, id: %lx\n", - (unsigned long)t->id(t)); - ret = t->abort(t); + ret = BF_TXN_ABORT(t); if (ret) - print_error(__FILE__, __LINE__, "(db) txn_abort, err: %s", - db_strerror(ret)); + print_error(__FILE__, __LINE__, "DB_TXN->abort(%lx) error: %s", + (unsigned long)BF_TXN_ID(t), db_strerror(ret)); + else + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_TXN->abort(%lx)\n", + (unsigned long)BF_TXN_ID(t)); ((dbh_t *)dsh->dbh)->txn = NULL; - return ret; + + switch (ret) { + case 0: + return DST_OK; + case DB_LOCK_DEADLOCK: + return DST_TEMPFAIL; + default: + return DST_FAILURE; + } } int db_txn_commit(dsh_t *dsh) @@ -421,18 +455,16 @@ assert(dbe); assert(t); - if (DEBUG_DATABASE(1)) - fprintf(dbgout, "db_txn_commit, id: %lx\n", - (unsigned long)t->id(t)); - ret = t->commit(t, 0); + ret = BF_TXN_COMMIT(t, 0); if (ret) - print_error(__FILE__, __LINE__, "(db) txn_commit, err: %s", - db_strerror(ret)); + print_error(__FILE__, __LINE__, "DB_TXN->commit(%lx) error: %s", + (unsigned long)BF_TXN_ID(t), db_strerror(ret)); + else + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_TXN->commit(%lx, 0)\n", + (unsigned long)BF_TXN_ID(t)); ((dbh_t *)dsh->dbh)->txn = NULL; - /* trickle write pages back to data base */ - (void)dbe->memp_trickle(dbe, 15, NULL); - switch (ret) { case 0: return DST_OK; @@ -443,29 +475,6 @@ } } -int db_checkpoint(dsh_t *dsh) -{ - int ret; - assert(dbe); - - ret = dbe->txn_checkpoint(dbe, - /* kBytes */ 64, - /* minutes */ 1, - /* flags */ 0); - - if (DEBUG_DATABASE(4)) - fprintf(dbgout, "db_checkpoint\n"); - if (ret) - print_error(__FILE__, __LINE__, "(db) checkpoint, err: %s", - db_strerror(ret)); - switch (ret) { - case DB_RUNRECOVERY: - db_txn_abort(dsh); - abort(); - } - return ret; -} - int db_delete(dsh_t *dsh, const dbv_t *token) { int ret = 0; @@ -482,13 +491,16 @@ ret = dbp->del(dbp, handle->txn, &db_key, 0); if (ret != 0 && ret != DB_NOTFOUND) { - print_error(__FILE__, __LINE__, "(db) db_delete('%.*s'), err: %d, %s", + print_error(__FILE__, __LINE__, "(db) DB->del('%.*s'), err: %d, %s", CLAMP_INT_MAX(db_key.size), (const char *) db_key.data, ret, db_strerror(ret)); exit(EX_ERROR); } + if (DEBUG_DATABASE(3)) + fprintf(dbgout, "DB->del(%.*s)\n", CLAMP_INT_MAX(db_key.size), (const char *) db_key.data); + return ret; /* 0 if ok */ } @@ -501,6 +513,7 @@ dbh_t *handle = dsh->dbh; DB *dbp = handle->dbp; + assert(handle->txn); DBT_init(db_key); DBT_init(db_data); @@ -518,19 +531,20 @@ val->leng = db_data.size; /* read count */ + if (DEBUG_DATABASE(3)) + fprintf(dbgout, "DB->get(%.*s): %s\n", + CLAMP_INT_MAX(token->leng), (char *) token->data, db_strerror(ret)); + switch (ret) { case 0: break; case DB_NOTFOUND: ret = DS_NOTFOUND; - if (DEBUG_DATABASE(3)) { - fprintf(dbgout, "db_get_dbvalue: [%.*s] not found\n", - CLAMP_INT_MAX(token->leng), (char *) token->data); - } break; default: - print_error(__FILE__, __LINE__, "(db) db->get( '%.*s' ), err: %d, %s", + print_error(__FILE__, __LINE__, "(db) DB->get( '%.*s' ), err: %d, %s", CLAMP_INT_MAX(token->leng), (char *) token->data, ret, db_strerror(ret)); + db_txn_abort(dsh); exit(EX_ERROR); } @@ -569,6 +583,10 @@ exit(EX_ERROR); } + if (DEBUG_DATABASE(3)) + fprintf(dbgout, "DB->put(%.*s): %s\n", + CLAMP_INT_MAX(token->leng), (char *) token->data, db_strerror(ret)); + return 0; } @@ -581,17 +599,16 @@ DB *dbp = handle->dbp; uint32_t f = nosync ? DB_NOSYNC : 0; - if (handle->txn) { - print_error(__FILE__, __LINE__, "(db) db_close called with transaction still open, aborting transaction."); - db_txn_abort(vhandle); - } - if (DEBUG_DATABASE(1)) - fprintf(dbgout, "db_close (%s) %s\n", + fprintf(dbgout, "DB->close(%s, %s)\n", handle->name, nosync ? "nosync" : "sync"); + if (handle->txn) { + print_error(__FILE__, __LINE__, "db_close called with transaction still open, program fault!"); + } + ret = dbp->close(dbp, f); -#if DB_AT_LEAST(3,2) && DB_AT_MOST(4,0) +#if DB_AT_LEAST(3,0) && DB_AT_MOST(4,0) /* ignore dirty pages in buffer pool */ if (ret == DB_INCOMPLETE) ret = 0; @@ -599,6 +616,7 @@ if (ret) print_error(__FILE__, __LINE__, "(db) db_close err: %d, %s", ret, db_strerror(ret)); + handle->dbp = NULL; dbh_free(handle); } @@ -612,15 +630,18 @@ dbh_t *handle = dsh->dbh; DB *dbp = handle->dbp; -#if 0 - /* old code */ + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "db_flush(%s)\n", handle->name); + + ret = BF_LOG_FLUSH(dbe, NULL); + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->log_flush(%p): %s\n", dbe, db_strerror(ret)); + ret = dbp->sync(dbp, 0); -#else - (void)dbp; - /* flushing the log is sufficient for us */ - ret = dbe->log_flush(dbe, NULL); -#endif -#if DB_AT_LEAST(3,2) && DB_AT_MOST(4,0) + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->sync(%p): %s\n", dbp, db_strerror(ret)); + +#if DB_AT_LEAST(3,0) && DB_AT_MOST(4,0) /* ignore dirty pages in buffer pool */ if (ret == DB_INCOMPLETE) ret = 0; @@ -649,7 +670,7 @@ ret = dbp->cursor(dbp, handle->txn, &dbcp, 0); if (ret) { - dbp->err(dbp, ret, "(cursor): %s", handle->path); + print_error(__FILE__, __LINE__, "(cursor): %s", handle->path); return -1; } @@ -686,11 +707,12 @@ ret = 0; break; default: - dbp->err(dbp, ret, "(c_get)"); + print_error(__FILE__, __LINE__, "(c_get): %s", db_strerror(ret)); ret = -1; } + if ((ret = dbcp->c_close(dbcp))) { - dbp->err(dbp, ret, "(c_close)"); + print_error(__FILE__, __LINE__, "(c_close): %s", db_strerror(ret)); ret = -1; } @@ -705,6 +727,8 @@ * or transactional initialization/shutdown */ static bool init = false; int db_init(void) { + const u_int32_t numlocks = 16384; + assert(bogohome); assert(dbe == NULL); @@ -714,27 +738,60 @@ print_error(__FILE__, __LINE__, "db_env_create, err: %d, %s", ret, db_strerror(ret)); abort(); } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "db_env_create: %p\n", dbe); + + dbe->set_errfile(dbe, stderr); + if (db_cachesize != 0 && (ret = dbe->set_cachesize(dbe, db_cachesize/1024, (db_cachesize % 1024) * 1024*1024, 1)) != 0) { - print_error(__FILE__, __LINE__, "DBENV->set_cachesize(%u), err: %d, %s", + print_error(__FILE__, __LINE__, "DB_ENV->set_cachesize(%u), err: %d, %s", db_cachesize, ret, db_strerror(ret)); exit(EXIT_FAILURE); } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_cachesize(%u)\n", db_cachesize); + + /* configure lock system size deadlock detector */ + if ((ret = dbe->set_lk_max_locks(dbe, numlocks)) != 0) { + print_error(__FILE__, __LINE__, "DB_ENV->set_lk_max_locks(%p, %lu), err: %s", dbe, + (unsigned long)numlocks, db_strerror(ret)); + exit(EXIT_FAILURE); + } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_lk_max_locks(%p, %lu)\n", dbe, (unsigned long)numlocks); /* configure automatic deadlock detector */ if ((ret = dbe->set_lk_detect(dbe, DB_LOCK_DEFAULT)) != 0) { - print_error(__FILE__, __LINE__, "DBENV->set_lk_detect(DB_LOCK_DEFAULT), err: %s", db_strerror(ret)); + print_error(__FILE__, __LINE__, "DB_ENV->set_lk_detect(DB_LOCK_DEFAULT), err: %s", db_strerror(ret)); exit(EXIT_FAILURE); } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_lk_detect(DB_LOCK_DEFAULT)\n"); - ret = dbe->open(dbe, bogohome, DB_INIT_MPOOL | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_TXN | DB_RECOVER | DB_CREATE, /* mode */ 0644); + /* ************************************************************* * + WARNING: + DO NOT ADD ANY DB_RECOVER OPTIONS HERE, RECOVERY + _MUST_ _NOT_ BE RUN BY MORE THAN ONE PROCESS SIMULTANEOUSLY + AND WE ARE NOT LOCKING -- + HENCE DB_RECOVER CAUSES DB_ENV CORRUPTION. + IT WILL NOT WORK! -- BEEN THERE, TRIED THAT + Matthias, 2004-03-17, BDB4.2 + * ************************************************************* */ + + ret = dbe->open(dbe, bogohome, DB_INIT_MPOOL | DB_INIT_LOCK + | DB_INIT_LOG | DB_INIT_TXN | DB_CREATE, /* mode */ 0644); if (ret != 0) { dbe->close(dbe, 0); - print_error(__FILE__, __LINE__, "DBENV->open, err: %d, %s", ret, db_strerror(ret)); + print_error(__FILE__, __LINE__, "DB_ENV->open, err: %d, %s", ret, db_strerror(ret)); + if (ret == DB_RUNRECOVERY) { + fprintf(stderr, "To recover, run: db_recover -v -c -h \"%s\"\n", + bogohome); + } exit(EXIT_FAILURE); } - - dbe->set_errfile(dbe, dbgout ? dbgout : stderr); + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->open(home=%s)\n", bogohome); } init = true; return 0; @@ -743,8 +800,11 @@ void db_cleanup(void) { if (!init) return; - if (dbe) - dbe->close(dbe, 0); + if (dbe) { + int ret = dbe->close(dbe, 0); + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->close(%p): %s\n", dbe, db_strerror(ret)); + } dbe = NULL; init = false; } |