Thread: [cvs] bogofilter/src db_lock.c,1.1,1.2 db_lock.h,1.1,1.2 mxcat.c,1.1,1.2 mxcat.h,1.1,1.2 Makefile.am
Fast Bayesian spam filter along lines suggested by Paul Graham
Brought to you by:
m-a
Update of /cvsroot/bogofilter/bogofilter/src In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv29084/src Modified Files: Makefile.am bf_exit.c bftypes.h bogohist.c bogomain.c bogotune.c bogoutil.c datastore.c datastore.h datastore_db.c datastore_db.h datastore_qdbm.c datastore_tdb.c maint.c register.c robx.c score.c wordlists.c wordlists.h Added Files: db_lock.c db_lock.h mxcat.c mxcat.h Log Message: Merge Transactional Store from branch. Index: Makefile.am =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/Makefile.am,v retrieving revision 1.94 retrieving revision 1.95 diff -u -d -r1.94 -r1.95 --- Makefile.am 15 Oct 2004 01:32:59 -0000 1.94 +++ Makefile.am 29 Oct 2004 01:11:52 -0000 1.95 @@ -81,6 +81,7 @@ collect.h collect.c \ configfile.h configfile.c \ datastore.h datastore.c db_handle_props.h \ + db_lock.h db_lock.c \ debug.h debug.c \ error.h error.c \ fgetsl.h fgetsl.c \ @@ -91,6 +92,7 @@ memstr.h memstr.c \ mime.h mime.c \ msgcounts.h msgcounts.c \ + mxcat.h mxcat.c \ passthrough.h passthrough.c \ paths.h paths.c \ prob.h prob.c \ Index: bf_exit.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/bf_exit.c,v retrieving revision 1.1 retrieving revision 1.2 diff -u -d -r1.1 -r1.2 --- bf_exit.c 7 Sep 2003 01:09:29 -0000 1.1 +++ bf_exit.c 29 Oct 2004 01:11:52 -0000 1.2 @@ -16,7 +16,7 @@ void bf_exit(void) { /* Ensure wordlists are closed */ - close_wordlists(false); + close_wordlists(); return; } Index: bogohist.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/bogohist.c,v retrieving revision 1.13 retrieving revision 1.14 diff -u -d -r1.13 -r1.14 --- bogohist.c 8 Jun 2004 14:16:57 -0000 1.13 +++ bogohist.c 29 Oct 2004 01:11:52 -0000 1.14 @@ -140,10 +140,22 @@ if (dsh == NULL) return EX_ERROR; + if (DST_OK != ds_txn_begin(dsh)) { + ds_close(dsh); + fprintf(stderr, "cannot begin transaction!\n"); + return EX_ERROR; + } + ds_get_msgcounts(dsh, &val); set_msg_counts(val.goodcount, val.spamcount); - ds_close(dsh, false); + if (DST_OK != ds_txn_commit(dsh)) { + ds_close(dsh); + fprintf(stderr, "cannot commit transaction!\n"); + return EX_ERROR; + } + + ds_close(dsh); ds_cleanup(); memset(&hist, 0, sizeof(hist)); Index: bogomain.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/bogomain.c,v retrieving revision 1.2 retrieving revision 1.3 diff -u -d -r1.2 -r1.3 --- bogomain.c 20 Oct 2004 00:39:53 -0000 1.2 +++ bogomain.c 29 Oct 2004 01:11:52 -0000 1.3 @@ -57,6 +57,7 @@ output_setup(); /* open all wordlists */ + ds_init(); open_wordlists((run_type == RUN_NORMAL) ? DS_READ : DS_WRITE); status = bogofilter(argc, argv); @@ -74,8 +75,9 @@ if (nonspam_exits_zero && exitcode != EX_ERROR) exitcode = EX_OK; - close_wordlists(false); + close_wordlists(); free_wordlists(); + ds_cleanup(); output_cleanup(); Index: bogotune.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/bogotune.c,v retrieving revision 1.167 retrieving revision 1.168 diff -u -d -r1.167 -r1.168 --- bogotune.c 19 Oct 2004 23:49:47 -0000 1.167 +++ bogotune.c 29 Oct 2004 01:11:52 -0000 1.168 @@ -980,9 +980,7 @@ fflush(stdout); } - ds_init(); ds_oper(ds_path, DS_READ, hook, userdata); - ds_cleanup(); return; } Index: bogoutil.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/bogoutil.c,v retrieving revision 1.122 retrieving revision 1.123 diff -u -d -r1.122 -r1.123 --- bogoutil.c 11 Oct 2004 00:40:25 -0000 1.122 +++ bogoutil.c 29 Oct 2004 01:11:53 -0000 1.123 @@ -81,8 +81,11 @@ rc = ds_oper(ds_file, DS_READ, ds_dump_hook, NULL); - if (verbose) - fprintf(dbgout, "%d tokens dumped\n", token_count); + if (rc) + fprintf(stderr, "error dumping tokens!\n"); + else + if (verbose) + fprintf(dbgout, "%d tokens dumped\n", token_count); return rc; } @@ -116,6 +119,8 @@ memset(buf, '\0', BUFSIZE); + ds_txn_begin(dsh); + for (;;) { dsv_t data; word_t *token; @@ -178,15 +183,35 @@ load_count += 1; /* Slower, but allows multiple lists to be concatenated */ set_date(date); - ds_read(dsh, token, &data); + switch (ds_read(dsh, token, &data)) { + case 0: + case 1: + break; + default: + rv = 1; + } data.spamcount += spamcount; data.goodcount += goodcount; - ds_write(dsh, token, &data); + if (ds_write(dsh, token, &data)) rv = 1; } word_free(token); } - ds_close(dsh, false); + if (rv) { + fprintf(stderr, "read or write error, aborting.\n"); + ds_txn_abort(dsh); + } else { + switch (ds_txn_commit(dsh)) { + case DST_FAILURE: + case DST_TEMPFAIL: + fprintf(stderr, "commit failed\n"); + exit(EXIT_FAILURE); + case DST_OK: + break; + } + } + + ds_close(dsh); ds_cleanup(); @@ -237,6 +262,7 @@ void *dsh = NULL; /* initialize to silence bogus gcc warning */ struct stat sb; + int rv = 0; /* protect against broken stat(2) that succeeds for empty names */ if (path == NULL || *path == '\0') { @@ -244,8 +270,6 @@ return EX_ERROR; } - ds_init(); - if ( stat(path, &sb) == 0 ) { /* XXX FIXME: deadlock possible */ if ( ! S_ISDIR(sb.st_mode)) { /* words from file */ @@ -277,11 +301,17 @@ } printf(head_format, "", "spam", "good", " Fisher"); + if (DST_OK != ds_txn_begin(dsh)) { + ds_close(dsh); + fprintf(stderr, "Cannot begin transaction.\n"); + return EX_ERROR; + } while (argc >= 0) { dsv_t val; word_t *token; + int rc; unsigned long spam_count; unsigned long good_count; @@ -299,35 +329,53 @@ token = word_new(word, (uint) strlen((const char *)word)); } - ds_read(dsh, token, &val); - spam_count = val.spamcount; - good_count = val.goodcount; + rc = ds_read(dsh, token, &val); + switch (rc) { + case 0: + spam_count = val.spamcount; + good_count = val.goodcount; - if (!show_probability) - printf(data_format, token->text, spam_count, good_count); - else - { - rob_prob = calc_prob(good_count, spam_count); - printf(data_format, token->text, spam_count, good_count, rob_prob); + if (!show_probability) + printf(data_format, token->text, spam_count, good_count); + else + { + rob_prob = calc_prob(good_count, spam_count); + printf(data_format, token->text, spam_count, good_count, rob_prob); + } + break; + case 1: + break; + default: + fprintf(stderr, "Cannot read from data base.\n"); + rv = EX_ERROR; + goto finish; } if (token != &buff->t) word_free(token); } - ds_close(dsh, false); +finish: + if (DST_OK != rv ? ds_txn_abort(dsh) : ds_txn_commit(dsh)) { + fprintf(stderr, "Cannot %s transaction.\n", rv ? "abort" : "commit"); + rv = EX_ERROR; + } + ds_close(dsh); ds_cleanup(); buff_free(buff); - return 0; + return rv; } static int get_robx(char *path) { double rx; + int ret = 0; rx = compute_robinson_x(path); + if (rx < 0) + return EX_ERROR; if (onlyprint) printf("%f\n", rx); @@ -343,22 +391,25 @@ run_type = REG_SPAM; set_bogohome(filepath); - ds_init(); dsh = ds_open(CURDIR_S, filepath, DS_WRITE); if (dsh == NULL) return EX_ERROR; - val.goodcount = 0; - val.spamcount = (uint32_t) (rx * 1000000); - ds_write(dsh, word_robx, &val); - ds_close(dsh, false); + if (DST_OK == ds_txn_begin(dsh)) { + val.goodcount = 0; + val.spamcount = (uint32_t) (rx * 1000000); + ret = ds_write(dsh, word_robx, &val); + if (DST_OK != ds_txn_commit(dsh)) + ret = 1; + } + ds_close(dsh); ds_cleanup(); word_free(word_robx); } - return EX_OK; + return ret ? EX_ERROR : EX_OK; } static void print_version(void) @@ -388,6 +439,8 @@ fprintf(stderr, "\n" "\t-h\tPrint this message.\n" + "\t-f dir\tRun recovery on data base in dir.\n" + "\t-F dir\tRun catastrophic recovery on data base in dir.\n" "\t-d file\tDump data from file to stdout.\n" "\t-l file\tLoad data from stdin into file.\n" "\t-u file\tUpgrade wordlist version.\n" @@ -422,10 +475,11 @@ static char *ds_file = NULL; static bool prob = false; -typedef enum { M_NONE, M_DUMP, M_LOAD, M_WORD, M_MAINTAIN, M_ROBX, M_HIST } cmd_t; +typedef enum { M_NONE, M_DUMP, M_LOAD, M_WORD, M_MAINTAIN, M_ROBX, M_HIST, + M_RECOVER, M_CRECOVER } cmd_t; static cmd_t flag = M_NONE; -#define OPTIONS ":a:c:d:DhH:I:k:l:m:np:r:R:s:u:vVw:x:X:y:" +#define OPTIONS ":a:c:d:Df:F:hH:I:k:l:m:np:r:R:s:u:vVw:x:X:y:" static int process_arglist(int argc, char **argv) { @@ -446,6 +500,18 @@ fprintf(stderr, "Unknown option -%c.\n", optopt); break; + case 'f': + flag = M_RECOVER; + count += 1; + ds_file = (char *) optarg; + break; + + case 'F': + flag = M_CRECOVER; + count += 1; + ds_file = (char *) optarg; + break; + case 'd': flag = M_DUMP; count += 1; @@ -585,7 +651,7 @@ if (count != 1) { - fprintf(stderr, "%s: Exactly one of the -d, -l, -R or -w flags " + fprintf(stderr, "%s: Exactly one of the -d, -f, -F, -l, -R or -w flags " "must be present.\n", progname); exit(EX_ERROR); } @@ -600,6 +666,7 @@ int main(int argc, char *argv[]) { + int rc; progtype = build_progtype(progname, DB_TYPE); set_today(); /* compute current date for token age */ @@ -615,27 +682,44 @@ atexit(bf_exit); set_bogohome(ds_file); + + if (flag == M_RECOVER) { + return ds_recover(0); + } else if (flag == M_CRECOVER) { + return ds_recover(1); + } + ds_init(); switch(flag) { case M_DUMP: - return dump_wordlist(ds_file); + rc = dump_wordlist(ds_file); + break; case M_LOAD: - return load_wordlist(ds_file); + rc = load_wordlist(ds_file); + break; case M_MAINTAIN: maintain = true; - return maintain_wordlist_file(ds_file); + rc = maintain_wordlist_file(ds_file); + break; case M_WORD: argc -= optind; argv += optind; - return display_words(ds_file, argc, argv, prob); + rc = display_words(ds_file, argc, argv, prob); + break; case M_HIST: - return histogram(ds_file); + rc = histogram(ds_file); + break; case M_ROBX: - return get_robx(ds_file); + rc = get_robx(ds_file); + break; case M_NONE: default: /* should have been handled above */ abort(); + break; } + + ds_cleanup(); + return rc; } Index: datastore.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore.c,v retrieving revision 1.45 retrieving revision 1.46 diff -u -d -r1.45 -r1.46 --- datastore.c 28 Jun 2004 01:43:29 -0000 1.45 +++ datastore.c 29 Oct 2004 01:11:53 -0000 1.46 @@ -23,6 +23,7 @@ #include "error.h" #include "maint.h" +#include "rand_sleep.h" #include "swap.h" #include "word.h" #include "xmalloc.h" @@ -127,19 +128,26 @@ if (!v) return NULL; - ds_init(); dsh = dsh_init(v); - if (db_created(v) && ! (open_mode & DS_LOAD)) - ds_set_wordlist_version(dsh, NULL); + if (db_created(v) && ! (open_mode & DS_LOAD)) { + if (DST_OK == ds_txn_begin(dsh)) { + ds_set_wordlist_version(dsh, NULL); + if (DST_OK == ds_txn_commit(dsh)) + return dsh; + } + db_close(v); + dsh_free(dsh); + dsh = NULL; + } return dsh; } -void ds_close(/*@only@*/ void *vhandle, bool nosync /** Normally false, if true, do not synchronize data. This should not be used in regular operation but only to ease the disk I/O load when the lock operation failed. */) +void ds_close(/*@only@*/ void *vhandle) { dsh_t *dsh = vhandle; - db_close(dsh->dbh, nosync); + db_close(dsh->dbh); xfree(dsh); } @@ -182,19 +190,26 @@ if (DEBUG_DATABASE(3)) { fprintf(dbgout, "ds_read: [%.*s] -- %lu,%lu\n", - CLAMP_INT_MAX(word->leng), word->text, + CLAMP_INT_MAX(word->leng), (const char *)word->text, (unsigned long)val->spamcount, (unsigned long)val->goodcount); } - break; + return 0; case DS_NOTFOUND: if (DEBUG_DATABASE(3)) { fprintf(dbgout, "ds_read: [%.*s] not found\n", CLAMP_INT_MAX(word->leng), (char *) word->text); } + return 1; + + case DS_ABORT_RETRY: + if (DEBUG_DATABASE(1)) { + print_error(__FILE__, __LINE__, "ds_read('%.*s') was aborted to recover from a deadlock.", + CLAMP_INT_MAX(word->leng), (char *) word->text); + } break; - + default: fprintf(dbgout, "ret=%d, DS_NOTFOUND=%d\n", ret, DS_NOTFOUND); print_error(__FILE__, __LINE__, "ds_read( '%.*s' ), err: %d, %s", @@ -202,7 +217,7 @@ exit(EX_ERROR); } - return found ? 0 : 1; + return ret; } int ds_write(void *vhandle, const word_t *word, dsv_t *val) @@ -231,7 +246,7 @@ if (DEBUG_DATABASE(3)) { fprintf(dbgout, "ds_write: [%.*s] -- %lu,%lu,%lu\n", - CLAMP_INT_MAX(word->leng), word->text, + CLAMP_INT_MAX(word->leng), (const char *)word->text, (unsigned long)val->spamcount, (unsigned long)val->goodcount, (unsigned long)val->date); @@ -255,6 +270,21 @@ return ret; /* 0 if ok */ } +int ds_txn_begin(void *vhandle) { + dsh_t *h = vhandle; + return db_txn_begin(h->dbh); +} + +int ds_txn_abort(void *vhandle) { + dsh_t *h = vhandle; + return db_txn_abort(h->dbh); +} + +int ds_txn_commit(void *vhandle) { + dsh_t *h = vhandle; + return db_txn_commit(h->dbh); +} + typedef struct { ds_foreach_t *hook; dsh_t *dsh; @@ -311,9 +341,15 @@ exit(EX_ERROR); } - ret = ds_foreach(dsh, hook, userdata); + if (DST_OK == ds_txn_begin(dsh)) { + ret = ds_foreach(dsh, hook, userdata); + if (ret) { ds_txn_abort(dsh); } + else + if (ds_txn_commit(dsh) != DST_OK) + ret = -1; + } - ds_close(dsh, false); + ds_close(dsh); return ret; } @@ -345,47 +381,39 @@ /* Get the number of messages associated with database. */ -bool ds_get_msgcounts(void *vhandle, dsv_t *val) +int ds_get_msgcounts(void *vhandle, dsv_t *val) { - int rc; dsh_t *dsh = vhandle; - rc = ds_read(dsh, msg_count_tok, val); - - return rc == 0; + return ds_read(dsh, msg_count_tok, val); } /* Set the number of messages associated with database. */ -void ds_set_msgcounts(void *vhandle, dsv_t *val) +int ds_set_msgcounts(void *vhandle, dsv_t *val) { dsh_t *dsh = vhandle; val->date = today; - ds_write(dsh, msg_count_tok, val); - - return; + return ds_write(dsh, msg_count_tok, val); } /* Get the wordlist version associated with database. */ -bool ds_get_wordlist_version(void *vhandle, dsv_t *val) +int ds_get_wordlist_version(void *vhandle, dsv_t *val) { - int rc; dsh_t *dsh = vhandle; - rc = ds_read(dsh, wordlist_version_tok, val); - - return rc == 0; + return ds_read(dsh, wordlist_version_tok, val); } /* Set the wordlist version associated with database. */ -void ds_set_wordlist_version(void *vhandle, dsv_t *val) +int ds_set_wordlist_version(void *vhandle, dsv_t *val) { dsh_t *dsh = vhandle; dsv_t tmp; @@ -399,12 +427,15 @@ val->date = today; - ds_write(dsh, wordlist_version_tok, val); - - return; + return ds_write(dsh, wordlist_version_tok, val); } const char *ds_version_str(void) { return db_version_str(); } + +int ds_recover(int catastrophic) +{ + return db_recover(catastrophic, 1); +} Index: datastore.h =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore.h,v retrieving revision 1.30 retrieving revision 1.31 diff -u -d -r1.30 -r1.31 --- datastore.h 7 Jun 2004 20:10:16 -0000 1.30 +++ datastore.h 29 Oct 2004 01:11:53 -0000 1.31 @@ -28,46 +28,69 @@ extern YYYYMMDD today; /* date as YYYYMMDD */ -/* typedef: Datastore handle type +/** Name of the special token that counts the spam and ham messages + * in the data base. + */ +#define MSG_COUNT ".MSG_COUNT" + +/** Datastore handle type ** - used to communicate between datastore layer and database layer ** - known to program layer as a void* */ - typedef struct { - void *dbh; /* database handle from db_open() */ + /** database handle from db_open() */ + void *dbh; + /** tracks endianness */ bool is_swapped; } dsh_t; -/* typedef: Datastore value type -** - used to communicate between program layer and datastore layer -*/ - +/** Datastore value type, used to communicate between program layer and + * datastore layer. + */ typedef struct { - u_int32_t count[IX_SIZE]; /* spam and ham counts */ + /** spam and ham counts */ + u_int32_t count[IX_SIZE]; + /** time stamp */ u_int32_t date; } dsv_t; #define spamcount count[IX_SPAM] #define goodcount count[IX_GOOD] -/* typedef: Database value type -** - used to communicate between datastore layer and database layer -*/ - +/** Status value used when a key is not found in the data base. */ #define DS_NOTFOUND (-1) +/** Status value when the transaction was aborted to resolve a deadlock + * and should be retried. */ +#define DS_ABORT_RETRY (-2) +/** Macro that clamps its argument to INT_MAX and casts it to int. */ #define CLAMP_INT_MAX(i) ((int)min(INT_MAX, (i))) +/** Database value type, used to communicate between datastore layer and + * database layer. + */ typedef struct { - void *data; /* addr of buffer */ - u_int32_t leng; /* number of data bytes */ + /** address of buffer */ + void *data; + /** number of data bytes */ + u_int32_t leng; } dbv_t; -/** Iterate over all elements in data base and call \p hook for each item. +/** Type of the callback function that ds_foreach calls. */ +typedef int ds_foreach_t( + /** current token that ds_foreach is looking at */ + word_t *token, + /** data store value */ + dsv_t *data, + /** unaltered value from ds_foreach call. */ + void *userdata); +/** Iterate over all records in data base and call \p hook for each item. * \p userdata is passed through to the hook function unaltered. */ -typedef int ds_foreach_t(word_t *token, dsv_t *data, void *userdata); -extern int ds_foreach(void *, ds_foreach_t *hook, void *userdata); +extern int ds_foreach(void *vhandle /** data store handle */, + ds_foreach_t *hook /** callback function */, + void *userdata /** opaque data that is passed to the callback function + unaltered */); /** Wrapper for ds_foreach that opens and closes file */ extern int ds_oper(const char *path, dbmode_t open_mode, @@ -84,20 +107,24 @@ dbmode_t mode /** open mode, DS_READ or DS_WRITE */); /** Close file and clean up. */ -extern void ds_close(/*@only@*/ void *, bool nosync /** Normally false, if true, do not synchronize data. This should not be used in regular operation but only to ease the disk I/O load when the lock operation failed. */); +extern void ds_close(/*@only@*/ void *vhandle); /** Flush pending writes to disk */ -extern void ds_flush(void *); +extern void ds_flush(void *vhandle); -/** Global initialization */ +/** Global initialization of datastore layer. */ extern void ds_init(void); -/** Cleanup storage allocation */ +/** Cleanup storage allocation of datastore layer. After calling this, + * datastore access is no longer permitted. */ extern void ds_cleanup(void); +/** Initialize datastore handle. */ dsh_t *dsh_init( void *dbh); /* database handle from db_open() */ +/** Free data store handle that must not be used after calling this + * function. */ void dsh_free(void *vhandle); /** Retrieve the value associated with a given word in a list. @@ -110,40 +137,99 @@ */ extern int ds_get_dbvalue(void *vhandle, const dbv_t *token, /*@out@*/ dbv_t *val); -/** Delete the key */ +/** Delete the key. */ extern int ds_delete(void *vhandle, const word_t *word); -/** Set the value associated with a given word in a list. Front end */ +/** Set the value associated with a given word in a list. Front end. */ extern int ds_write (void *vhandle, const word_t *word, dsv_t *val); -/** Set the value associated with a given word in a list. Implementation */ +/** Set the value associated with a given word in a list. Implementation. */ extern int ds_set_dbvalue(void *vhandle, const dbv_t *token, dbv_t *val); -/** Update the value associated with a given word in a list */ +/** Update the value associated with a given word in a list. */ extern void ds_updvalues(void *vhandle, const dbv_t *token, const dbv_t *updval); -/** Get the database message count */ -extern bool ds_get_msgcounts(void *vhandle, dsv_t *val); +/** Get the database message count. */ +extern int ds_get_msgcounts(void *vhandle, dsv_t *val); -/** set the database message count */ -extern void ds_set_msgcounts(void *vhandle, dsv_t *val); +/** Set the database message count. */ +extern int ds_set_msgcounts(void *vhandle, dsv_t *val); + +/* transactional code */ +/** Start a transaction for the data store identified by vhandle. + * All data base operations, including reading, must be "opened" by + * ds_txn_begin and must be "closed" by either ds_txn_commit (to keep + * changes) or ds_txn_abort (to discard changes made since the last + * ds_txn_begin for the data base). Application or system crash will + * lose any changes made since ds_txn_begin that have not been + * acknowledged by successful ds_txn_commit(). + * \returns + * - DST_OK for success. It is OK to proceed in data base access. + * - DST_TEMPFAIL for problem. It is unknown whether this actually + * happens. You must not touch the data base. + * - DST_FAILURE for problem. You must not touch the data base. + */ +extern int ds_txn_begin(void *vhandle); + +/** Commit a transaction, keeping changes. As with any transactional + * data base, concurrent updates to the same pages in the data base can + * cause a deadlock of the writers. The datastore_xxx.c code will handle + * the detection for you, in a way that it aborts as many transactions + * until one can proceed. The aborted transactions will return + * DST_TEMPFAIL and must be retried. No data base access must happen + * after this call until the next ds_txn_begin(). + * \returns + * - DST_OK to signify that the data has made it to the disk + * (which means nothing if the disk's write cache is enabled and the + * kernel has no means of synchronizing the cache - this is unknown for + * most kernels) + * - DST_TEMPFAIL when a transaction has been aborted by the deadlock + * detector and must be retried + * - DST_FAILURE when a permanent error has occurred that cannot be + * recovered from by the application (for instance, because corruption + * has occurred and needs to be recovered). + */ +extern int ds_txn_commit(void *vhandle); + +/** Abort a transaction, discarding all changes since the previous + * ds_txn_begin(). Changes are rolled back as though the transaction had + * never been tried. No data base access must happen after this call + * until the next ds_txn_begin(). + * \returns + * - DST_OK for success. + * - DST_TEMPFAIL for failure. It is uncertain if this actually happens. + * - DST_FAILURE for failure. The application cannot continue. + */ +extern int ds_txn_abort(void *vhandle); + +/** Successful return from ds_txn_* operation. */ +#define DST_OK (0) +/** Temporary failure return from ds_txn_* operation, the application + * should retry the failed data base transaction. */ +#define DST_TEMPFAIL (1) +/** Permanent failure return from ds_txn_* operation, the application + * should clean up and exit. */ +#define DST_FAILURE (2) /** Get the database version */ -extern bool ds_get_wordlist_version(void *vhandle, dsv_t *val); +extern int ds_get_wordlist_version(void *vhandle, dsv_t *val); /** set the database version */ -extern void ds_set_wordlist_version(void *vhandle, dsv_t *val); +extern int ds_set_wordlist_version(void *vhandle, dsv_t *val); -/* Get the current process id */ +/** Get the current process ID. */ extern unsigned long ds_handle_pid(void *vhandle); -/* Get the database filename */ +/** Get the database filename. */ extern char *ds_handle_filename(void *vhandle); -/* Locks and unlocks file descriptor */ +/** Locks and unlocks file descriptor. */ extern int ds_lock(int fd, int cmd, short int type); -/* Returns version string */ +/** Returns version string. */ extern const char *ds_version_str(void); +/** Runs recovery on data base */ +extern int ds_recover(int); + #endif Index: datastore_db.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore_db.c,v retrieving revision 1.111 retrieving revision 1.112 diff -u -d -r1.111 -r1.112 --- datastore_db.c 29 Sep 2004 03:28:04 -0000 1.111 +++ datastore_db.c 29 Oct 2004 01:11:53 -0000 1.112 @@ -7,7 +7,7 @@ AUTHORS: Gyepi Sam <gy...@pr...> 2002 - 2003 -Matthias Andree <mat...@gm...> 2003 +Matthias Andree <mat...@gm...> 2003 - 2004 ******************************************************************************/ @@ -17,6 +17,24 @@ ** 3. Bogofilter's header files */ +/* This code has been tested with BerkeleyDB 3.0, 3.1 3.2, 3.3, 4.0, + * 4.1 and 4.2. -- Matthias Andree, 2004-10-04 */ + +/* TODO: + * - implement proper retry when our transaction is aborted after a + * deadlock + * - document code changes + * - conduct massive tests + * - check if we really need the log files for "catastrophic recovery" + * or if we can remove them (see db_archive documentation) + * as the log files are *HUGE* even compared with the data base + */ + +/* + * NOTE: this code is an "#if" nightmare due to the many different APIs + * in the many different BerkeleyDB versions. + */ + #define DONT_TYPEDEF_SSIZE_T 1 #include "common.h" @@ -43,8 +61,11 @@ #include "word.h" #include "xmalloc.h" #include "xstrdup.h" +#include "mxcat.h" +#include "db_lock.h" -static DB_ENV *dbe; /* libdb environment, if in use, NULL otherwise */ +static DB_ENV *dbe; /* libdb environment, if in use, NULL otherwise */ +static int lockfd = -1; /* fd of lock file to prevent concurrent recovery */ static const DBTYPE dbtype = DB_BTREE; @@ -56,18 +77,16 @@ int fd; /* file descriptor of data base file */ dbmode_t open_mode; /* datastore open mode, DS_READ/DS_WRITE */ DB *dbp; /* data base handle */ - bool locked; bool is_swapped; /* set if CPU and data base endianness differ */ + DB_TXN *txn; /* stores the transaction handle */ bool created; /* if newly created; for datastore.c (to add .WORDLIST_VERSION) */ } dbh_t; -#define DBT_init(dbt) (memset(&dbt, 0, sizeof(DBT))) - -#define DB_AT_LEAST(maj, min) ((DB_VERSION_MAJOR > (maj)) || ((DB_VERSION_MAJOR == (maj)) && (DB_VERSION_MINOR >= (min)))) -#define DB_AT_MOST(maj, min) ((DB_VERSION_MAJOR < (maj)) || ((DB_VERSION_MAJOR == (maj)) && (DB_VERSION_MINOR <= (min)))) +#define DBT_init(dbt) (memset(&dbt, 0, sizeof(DBT))) -/* dummy infrastructure, to be expanded by environment - * or transactional initialization/shutdown */ +#define DB_AT_LEAST(maj, min) ((DB_VERSION_MAJOR > (maj)) || ((DB_VERSION_MAJOR == (maj)) && (DB_VERSION_MINOR >= (min)))) +#define DB_AT_MOST(maj, min) ((DB_VERSION_MAJOR < (maj)) || ((DB_VERSION_MAJOR == (maj)) && (DB_VERSION_MINOR <= (min)))) +#define DB_EQUAL(maj, min) ((DB_VERSION_MAJOR == (maj)) && (DB_VERSION_MINOR == (min))) /* Function definitions */ @@ -80,6 +99,9 @@ if (flags & DB_EXCL) flags &= ~DB_EXCL, strlcat(buf, "DB_EXCL ", sizeof(buf)); if (flags & DB_NOMMAP) flags &= ~DB_NOMMAP, strlcat(buf, "DB_NOMMAP ", sizeof(buf)); if (flags & DB_RDONLY) flags &= ~DB_RDONLY, strlcat(buf, "DB_RDONLY ", sizeof(buf)); +#if DB_AT_LEAST(4,1) + if (flags & DB_AUTO_COMMIT) flags &= ~DB_AUTO_COMMIT, strlcat(buf, "DB_AUTO_COMMIT ", sizeof(buf)); +#endif snprintf(b2, sizeof(b2), "%#lx", (unsigned long)flags); if (flags) strlcat(buf, b2, sizeof(buf)); return buf; @@ -92,9 +114,13 @@ { int ret; +#if DB_AT_LEAST(4,1) + flags |= DB_AUTO_COMMIT; +#endif + ret = db->open(db, #if DB_AT_LEAST(4,1) - 0, /* TXN handle - we use autocommit instead */ + 0, /* TXN handle - we use autocommit instead */ #endif file, database, type, flags, mode); @@ -108,19 +134,6 @@ return ret; } -/* implements locking. */ -static int db_lock(int fd, int cmd, short int type) -{ - struct flock lock; - - lock.l_type = type; - lock.l_start = 0; - lock.l_whence = (short int)SEEK_SET; - lock.l_len = 0; - return (fcntl(fd, cmd, &lock)); -} - - /** "constructor" - allocate our handle and initialize its contents */ static dbh_t *dbh_init(const char *path, const char *name) { @@ -130,16 +143,17 @@ handle = xmalloc(sizeof(dbh_t)); memset(handle, 0, sizeof(dbh_t)); /* valgrind */ - handle->fd = -1; /* for lock */ + handle->fd = -1; handle->path = xstrdup(path); handle->name = xmalloc(len); build_path(handle->name, len, path, name); - handle->locked = false; handle->is_swapped = false; handle->created = false; + handle->txn = NULL; + return handle; } @@ -155,6 +169,8 @@ return; } +/* If header and library version do not match, + * print an error message on stderr and exit with EX_ERROR. */ /* Returns is_swapped flag */ bool db_is_swapped(void *vhandle) @@ -182,6 +198,9 @@ if (!version_ok) { version_ok = 1; (void)db_version(&maj, &min, NULL); + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "db_version: Header version %d.%d, library version %d.%d\n", + DB_VERSION_MAJOR, DB_VERSION_MINOR, maj, min); if (!(maj == DB_VERSION_MAJOR && min == DB_VERSION_MINOR)) { fprintf(stderr, "The DB versions do not match.\n" "This program was compiled for DB version %d.%d,\n" @@ -239,10 +258,12 @@ ret = dbp->stat(dbp, &dbstat, DB_FAST_STAT); if (ret) { - dbp->err (dbp, ret, "%s (db) DB->stat", progname); + print_error(__FILE__, __LINE__, "(db) DB->stat"); return 0xffffffff; } pagesize = dbstat->bt_pagesize; + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->stat success, pagesize: %lu\n", (unsigned long)pagesize); free(dbstat); return pagesize; } @@ -271,36 +292,23 @@ char *t; dbh_t *handle = NULL; - uint32_t open_flags = 0; - /* - * If locking fails with EAGAIN, then try without MMAP, fcntl() - * locking may be forbidden on mmapped files, or mmap may not be - * available for NFS. Thanks to Piotr Kucharski and Casper Dik, - * see news:comp.protocols.nfs and the bogofilter mailing list, - * message #1520, Message-ID: <200...@sg...> - * Date: Thu, 6 Feb 2003 18:20:16 +0100 - */ - size_t idx; - uint32_t retryflags[] = { 0, DB_NOMMAP }; + uint32_t opt_flags = 0; - check_db_version(); + assert(init); - if (open_mode & DS_READ ) - open_flags = DB_RDONLY; - if (open_mode & DS_CREATE ) - open_flags = DB_CREATE | DB_EXCL; + check_db_version(); - /* retry when locking failed */ - for (idx = 0; idx < COUNTOF(retryflags); idx += 1) { +#if DB_AT_LEAST(4,1) + int flags; +#endif DB *dbp; - bool err = false; - uint32_t retryflag = retryflags[idx], pagesize; + uint32_t pagesize; handle = dbh_init(path, name); if (handle == NULL) - break; + return NULL; /* create DB handle */ if ((ret = db_create (&dbp, dbe, 0)) != 0) { @@ -311,45 +319,39 @@ handle->dbp = dbp; - /* set cache size, but not when we're using an environment */ - if (dbe == NULL && db_cachesize != 0 && - (ret = dbp->set_cachesize(dbp, db_cachesize/1024, (db_cachesize % 1024) * 1024*1024, 1)) != 0) { - print_error(__FILE__, __LINE__, "(db) DB(%s)->set_cachesize(%u,%u,%u), err: %d, %s", - handle->name, db_cachesize/1024u, (db_cachesize % 1024u) * 1024u*1024u, 1u, ret, db_strerror(ret)); - goto open_err; - } + /* set flags */ +#if DB_EQUAL(4,1) + flags = DB_CHKSUM_SHA1; +#endif +#if DB_AT_LEAST(4,2) + flags = DB_CHKSUM; +#endif + +#if DB_AT_LEAST(4,1) + ret = dbp->set_flags(dbp, flags); + if (ret) { + print_error(__FILE__, __LINE__, + "(db) DB->set_flags(%d) failed: %s", + flags, db_strerror(ret)); + dbp->close(dbp, 0); + goto open_err; + } +#endif /* open data base */ - if (dbe && (t = strrchr(handle->name, DIRSEP_C))) + if ((t = strrchr(handle->name, DIRSEP_C))) t++; else t = handle->name; retry_db_open: - - ret = DB_OPEN(dbp, t, NULL, dbtype, open_flags | retryflag, 0664); - - if (ret != 0) { - err = (ret != ENOENT) || (open_flags & DB_RDONLY); - if (!err) { - ret = DB_OPEN(dbp, t, NULL, dbtype, open_flags | DB_CREATE | DB_EXCL | retryflag, 0664); - if (ret != 0) - err = true; - else - handle->created = true; - } - } - - if (ret != 0) { - if (ret == ENOENT && open_flags != DB_RDONLY) - return NULL; - else - err = true; - } - - if (err) + handle->created = false; + if ((ret = DB_OPEN(dbp, t, NULL, dbtype, opt_flags, 0664)) != 0 + && ( ret != ENOENT || opt_flags == DB_RDONLY || + ((handle->created = true), + (ret = DB_OPEN(dbp, t, NULL, dbtype, opt_flags | DB_CREATE | DB_EXCL, 0664)) != 0))) { - if (open_flags != DB_RDONLY && ret == EEXIST && --retries) { + if (open_mode != DB_RDONLY && ret == EEXIST && --retries) { /* sleep for 4 to 100 ms - this is just to give up the CPU * to another process and let it create the data base * file in peace */ @@ -375,20 +377,26 @@ handle->is_swapped = is_swapped ? true : false; if (ret != 0) { - dbp->err (dbp, ret, "%s (db) DB->get_byteswapped: %s", - progname, handle->name); - db_close(handle, false); + print_error(__FILE__, __LINE__, "(db) DB->get_byteswapped: %s", + db_strerror(ret)); + db_close(handle); return NULL; /* handle already freed, ok to return */ } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->get_byteswapped: %s\n", is_swapped ? "true" : "false"); + ret = dbp->fd(dbp, &handle->fd); if (ret != 0) { - dbp->err (dbp, ret, "%s (db) DB->fd: %s", - progname, handle->name); - db_close(handle, false); + print_error(__FILE__, __LINE__, "(db) DB->fd: %s", + db_strerror(ret)); + db_close(handle); return NULL; /* handle already freed, ok to return */ } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->fd: %d\n", handle->fd); + /* query page size */ pagesize = get_psize(dbp); if (pagesize == 0xffffffff) { @@ -401,38 +409,9 @@ /* check file size limit */ check_fsize_limit(handle->fd, pagesize); - - /* skip manual lock when run in environment */ - if (dbe) - break; - - /* try fcntl lock */ - if (db_lock(handle->fd, F_SETLK, - (short int)(open_mode == DS_READ ? F_RDLCK : F_WRLCK))) - { - int e = errno; - db_close(handle, true); - handle = NULL; /* db_close freed it, we don't want to use it anymore */ - errno = e; - if (errno == EACCES) - errno = EAGAIN; - if (errno != EAGAIN) - return NULL; - } else { - /* have lock */ - break; - } - } /* for idx over retryflags */ - - if (handle) { - handle->locked = true; - if (handle->fd < 0) - handle->locked=false; - - return handle; } - return NULL; + return handle; open_err: dbh_free(handle); @@ -444,6 +423,121 @@ return NULL; } +/* wrapper for the API that changed in 4.0, to + * collect the junk in a location separate from the implementation */ +#if DB_AT_LEAST(4,0) +/* BerkeleyDB 4.0, 4.1, 4.2 */ +#define BF_LOG_FLUSH(e, i) ((e)->log_flush((e), (i))) +#define BF_MEMP_SYNC(e, l) ((e)->memp_sync((e), (l))) +#define BF_MEMP_TRICKLE(e, p, n) ((e)->memp_trickle((e), (p), (n))) +#define BF_TXN_BEGIN(e, f, g, h) ((e)->txn_begin((e), (f), (g), (h))) +#define BF_TXN_ID(t) ((t)->id(t)) +#define BF_TXN_ABORT(t) ((t)->abort((t))) +#define BF_TXN_COMMIT(t, f) ((t)->commit((t), (f))) +#define BF_TXN_CHECKPOINT(e, k, m, f) ((e)->txn_checkpoint((e), (k), (m), (f))) +#else +/* BerkeleyDB 3.0, 3.1, 3.2, 3.3 */ +#define BF_LOG_FLUSH(e, i) (log_flush((e), (i))) +#define BF_MEMP_SYNC(e, l) (memp_sync((e), (l))) +#define BF_MEMP_TRICKLE(e, p, n) (memp_trickle((e), (p), (n))) +#define BF_TXN_BEGIN(e, f, g, h) (txn_begin((e), (f), (g), (h))) +#define BF_TXN_ID(t) (txn_id(t)) +#define BF_TXN_ABORT(t) (txn_abort((t))) +#define BF_TXN_COMMIT(t, f) (txn_commit((t), (f))) +#if DB_AT_LEAST(3,1) +/* BerkeleyDB 3.1, 3.2, 3.3 */ +#define BF_TXN_CHECKPOINT(e, k, m, f) (txn_checkpoint((e), (k), (m), (f))) +#else +/* BerkeleyDB 3.0 */ +#define BF_TXN_CHECKPOINT(e, k, m, f) (txn_checkpoint((e), (k), (m))) +#endif +#endif + +/** begin transaction. Returns 0 for success. */ +int db_txn_begin(void *vhandle) +{ + DB_TXN *t; + int ret; + + dbh_t *handle = vhandle; + assert(dbe); + assert(handle); + + ret = BF_TXN_BEGIN(dbe, NULL, &t, 0); + if (ret) { + print_error(__FILE__, __LINE__, "DB_ENV->txn_begin(%p), err: %s", + (void *)dbe, db_strerror(ret)); + return ret; + } + handle->txn = t; + if (DEBUG_DATABASE(2)) + fprintf(dbgout, "DB_ENV->txn_begin(%p), tid: %lx\n", + (void *)dbe, (unsigned long)BF_TXN_ID(t)); + + return 0; +} + +int db_txn_abort(void *vhandle) +{ + int ret; + dbh_t *handle = vhandle; + DB_TXN *t = handle->txn; + assert(dbe); + assert(t); + + ret = BF_TXN_ABORT(t); + if (ret) + print_error(__FILE__, __LINE__, "DB_TXN->abort(%lx) error: %s", + (unsigned long)BF_TXN_ID(t), db_strerror(ret)); + else + if (DEBUG_DATABASE(2)) + fprintf(dbgout, "DB_TXN->abort(%lx)\n", + (unsigned long)BF_TXN_ID(t)); + handle->txn = NULL; + + switch (ret) { + case 0: + return DST_OK; + case DB_LOCK_DEADLOCK: + return DST_TEMPFAIL; + default: + return DST_FAILURE; + } +} + +int db_txn_commit(void *vhandle) +{ + int ret; + dbh_t *handle = vhandle; + DB_TXN *t = handle->txn; + u_int32_t id; + assert(dbe); + assert(t); + + id = BF_TXN_ID(t); + ret = BF_TXN_COMMIT(t, 0); + if (ret) + print_error(__FILE__, __LINE__, "DB_TXN->commit(%lx) error: %s", + (unsigned long)id, db_strerror(ret)); + else + if (DEBUG_DATABASE(2)) + fprintf(dbgout, "DB_TXN->commit(%lx, 0)\n", + (unsigned long)id); + handle->txn = NULL; + + switch (ret) { + case 0: + /* push out buffer pages so that >=15% are clean - we + * can ignore errors here, as the log has all the data */ + BF_MEMP_TRICKLE(dbe, 15, NULL); + + return DST_OK; + case DB_LOCK_DEADLOCK: + return DST_TEMPFAIL; + default: + return DST_FAILURE; + } +} int db_delete(void *vhandle, const dbv_t *token) { @@ -454,19 +548,24 @@ DBT db_key; DBT_init(db_key); + assert(handle->txn); + db_key.data = token->data; db_key.size = token->leng; - ret = dbp->del(dbp, NULL, &db_key, 0); + ret = dbp->del(dbp, handle->txn, &db_key, 0); if (ret != 0 && ret != DB_NOTFOUND) { - print_error(__FILE__, __LINE__, "(db) db_delete('%.*s'), err: %d, %s", + print_error(__FILE__, __LINE__, "(db) DB->del('%.*s'), err: %d, %s", CLAMP_INT_MAX(db_key.size), (const char *) db_key.data, ret, db_strerror(ret)); exit(EX_ERROR); } + if (DEBUG_DATABASE(3)) + fprintf(dbgout, "DB->del(%.*s)\n", CLAMP_INT_MAX(db_key.size), (const char *) db_key.data); + return ret; /* 0 if ok */ } @@ -479,6 +578,8 @@ dbh_t *handle = vhandle; DB *dbp = handle->dbp; + assert(handle); + assert(handle->txn); DBT_init(db_key); DBT_init(db_data); @@ -491,26 +592,32 @@ db_data.ulen = val->leng; /* max size */ db_data.flags = DB_DBT_USERMEM; /* saves the memcpy */ - ret = dbp->get(dbp, NULL, &db_key, &db_data, 0); + /* DB_RMW can avoid deadlocks */ + ret = dbp->get(dbp, handle->txn, &db_key, &db_data, handle->open_mode == DS_READ ? 0 : DB_RMW); - val->leng = db_data.size; /* read count */ + if (DEBUG_DATABASE(3)) + fprintf(dbgout, "DB->get(%.*s): %s\n", + CLAMP_INT_MAX(token->leng), (char *) token->data, db_strerror(ret)); switch (ret) { case 0: break; case DB_NOTFOUND: ret = DS_NOTFOUND; - if (DEBUG_DATABASE(3)) { - fprintf(dbgout, "db_get_dbvalue: [%.*s] not found\n", - CLAMP_INT_MAX(token->leng), (char *) token->data); - } + break; + case DB_LOCK_DEADLOCK: + db_txn_abort(handle); + ret = DS_ABORT_RETRY; break; default: - print_error(__FILE__, __LINE__, "(db) db_get_dbvalue( '%.*s' ), err: %d, %s", - CLAMP_INT_MAX(token->leng), (char *) token->data, ret, db_strerror(ret)); + print_error(__FILE__, __LINE__, "(db) DB->get(TXN=%lu, '%.*s' ), err: %d, %s", + (unsigned long)handle->txn, CLAMP_INT_MAX(token->leng), (char *) token->data, ret, db_strerror(ret)); + db_txn_abort(handle); exit(EX_ERROR); } + val->leng = db_data.size; /* read count */ + return ret; } @@ -524,6 +631,7 @@ dbh_t *handle = vhandle; DB *dbp = handle->dbp; + assert(handle->txn); DBT_init(db_key); DBT_init(db_data); @@ -534,7 +642,12 @@ db_data.data = val->data; db_data.size = val->leng; /* write count */ - ret = dbp->put(dbp, NULL, &db_key, &db_data, 0); + ret = dbp->put(dbp, handle->txn, &db_key, &db_data, 0); + + if (ret == DB_LOCK_DEADLOCK) { + db_txn_abort(handle); + return DS_ABORT_RETRY; + } if (ret != 0) { print_error(__FILE__, __LINE__, "(db) db_set_dbvalue( '%.*s' ), err: %d, %s", @@ -542,31 +655,61 @@ exit(EX_ERROR); } + if (DEBUG_DATABASE(3)) + fprintf(dbgout, "DB->put(%.*s): %s\n", + CLAMP_INT_MAX(token->leng), (char *) token->data, db_strerror(ret)); + return 0; } +static int db_flush_dirty(DB_ENV *env, int ret) { +#if DB_AT_LEAST(3,0) && DB_AT_MOST(4,0) + /* flush dirty pages in buffer pool */ + while (ret == DB_INCOMPLETE) { + rand_sleep(10000,1000000); + ret = BF_MEMP_SYNC(env, NULL); + } +#else + (void)env; +#endif + + return ret; +} /* Close files and clean up. */ -void db_close(void *vhandle, bool nosync) +void db_close(void *vhandle) { int ret; dbh_t *handle = vhandle; DB *dbp = handle->dbp; - uint32_t f = nosync ? DB_NOSYNC : 0; + uint32_t f = DB_NOSYNC; /* safe as long as we're logging TXNs*/ + +#if DB_AT_LEAST(4,2) + /* get_flags and DB_TXN_NOT_DURABLE are new in 4.2 */ + ret = dbe->get_flags(dbe, &f); + if (ret) { + print_error(__FILE__, __LINE__, "get_flags returned error: %s", + db_strerror(ret)); + f = 0; + } else { + f = (f & DB_TXN_NOT_DURABLE) ? 0 : DB_NOSYNC; + } +#endif if (DEBUG_DATABASE(1)) - fprintf(dbgout, "db_close (%s) %s\n", - handle->name, nosync ? "nosync" : "sync"); + fprintf(dbgout, "DB->close(%s, %s)\n", + handle->name, f & DB_NOSYNC ? "nosync" : "sync"); + + if (handle->txn) { + print_error(__FILE__, __LINE__, "db_close called with transaction still open, program fault!"); + } ret = dbp->close(dbp, f); -#if DB_AT_LEAST(3,2) && DB_AT_MOST(4,0) - /* ignore dirty pages in buffer pool */ - if (ret == DB_INCOMPLETE) - ret = 0; -#endif + ret = db_flush_dirty(dbe, ret); if (ret) print_error(__FILE__, __LINE__, "(db) db_close err: %d, %s", ret, db_strerror(ret)); + handle->dbp = NULL; dbh_free(handle); } @@ -580,23 +723,30 @@ dbh_t *handle = vhandle; DB *dbp = handle->dbp; + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "db_flush(%s)\n", handle->name); + ret = dbp->sync(dbp, 0); -#if DB_AT_LEAST(3,2) && DB_AT_MOST(4,0) - /* ignore dirty pages in buffer pool */ - if (ret == DB_INCOMPLETE) - ret = 0; -#endif + ret = db_flush_dirty(dbe, ret); + + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB->sync(%p): %s\n", (void *)dbp, db_strerror(ret)); + if (ret) print_error(__FILE__, __LINE__, "(db) db_sync: err: %d, %s", ret, db_strerror(ret)); -} + ret = BF_LOG_FLUSH(dbe, NULL); + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->log_flush(%p): %s\n", (void *)dbe, + db_strerror(ret)); +} int db_foreach(void *vhandle, db_foreach_t hook, void *userdata) { dbh_t *handle = vhandle; DB *dbp = handle->dbp; - int ret = 0; + int ret = 0, eflag = 0; DBC dbc; DBC *dbcp = &dbc; @@ -606,9 +756,12 @@ memset(&key, 0, sizeof(key)); memset(&data, 0, sizeof(data)); - ret = dbp->cursor(dbp, NULL, &dbcp, 0); + assert(dbe); + assert(handle->txn); + + ret = dbp->cursor(dbp, handle->txn, &dbcp, 0); if (ret) { - dbp->err(dbp, ret, "(cursor): %s", handle->path); + print_error(__FILE__, __LINE__, "(cursor): %s", handle->path); return -1; } @@ -645,61 +798,248 @@ ret = 0; break; default: - dbp->err(dbp, ret, "(c_get)"); - ret = -1; + print_error(__FILE__, __LINE__, "(c_get): %s", db_strerror(ret)); + eflag = 1; + break; } - if (dbcp->c_close(dbcp)) { - dbp->err(dbp, ret, "(c_close)"); - ret = -1; + + if ((ret = dbcp->c_close(dbcp))) { + print_error(__FILE__, __LINE__, "(c_close): %s", db_strerror(ret)); + eflag = -1; } - return ret; /* 0 if ok */ + return eflag ? -1 : ret; /* 0 if ok */ } const char *db_str_err(int e) { return db_strerror(e); } +/** set an fcntl-style lock on \a path. + * \a locktype is F_RDLCK, F_WRLCK, F_UNLCK + * \a mode is F_SETLK or F_SETLKW + * \return file descriptor of locked file if successful + * negative value in case of error + */ +static int plock(const char *path, short locktype, int mode) { + struct flock fl; + int fd, r; + + fd = open(path, O_RDWR); + if (fd < 0) return fd; + + fl.l_type = locktype; + fl.l_whence = SEEK_SET; + fl.l_start = (off_t)0; + fl.l_len = (off_t)0; + r = fcntl(fd, mode, &fl); + if (r < 0) + return r; + return fd; +} + +static int db_try_glock(short locktype, int lockcmd) { + int ret; + char *t; + const char *const tackon = DIRSEP_S "lockfile-d"; + + assert(bogohome); + + /* lock */ + ret = mkdir(bogohome, (mode_t)0755); + if (ret && errno != EEXIST) { + print_error(__FILE__, __LINE__, "mkdir(%s): %s", + bogohome, strerror(errno)); + exit(EXIT_FAILURE); + } + + t = mxcat(bogohome, tackon, NULL); + + /* All we are interested in is that this file exists, we'll close it + * right away as plock down will open it again */ + ret = open(t, O_RDWR|O_CREAT|O_EXCL, (mode_t)0644); + if (ret < 0 && errno != EEXIST) { + print_error(__FILE__, __LINE__, "open(%s): %s", + t, strerror(errno)); + exit(EXIT_FAILURE); + } + + if (ret >= 0) + close(ret); + + lockfd = plock(t, locktype, lockcmd); + if (lockfd < 0 && errno != EAGAIN && errno != EACCES) { + print_error(__FILE__, __LINE__, "lock(%s): %s", + t, strerror(errno)); + exit(EXIT_FAILURE); + } + + free(t); + /* lock set up */ + return lockfd; +} + /* dummy infrastructure, to be expanded by environment * or transactional initialization/shutdown */ +static int db_xinit(u_int32_t numlocks, u_int32_t numobjs, u_int32_t flags) +{ + int ret; -int db_init(void) { - char *t; - int cdb_alldb = 1; + assert(bogohome); + assert(dbe == NULL); - if (bogohome && getenv("BOGOFILTER_CONCURRENT_DATA_STORE")) { - int ret = db_env_create(&dbe, 0); - if (ret != 0) { - print_error(__FILE__, __LINE__, "db_env_create, err: %d, %s", ret, db_strerror(ret)); - exit(EX_ERROR); - } - if (db_cachesize != 0 && - (ret = dbe->set_cachesize(dbe, db_cachesize/1024, (db_cachesize % 1024) * 1024*1024, 1)) != 0) { - print_error(__FILE__, __LINE__, "DBENV->set_cachesize(%u), err: %d, %s", - db_cachesize, ret, db_strerror(ret)); - exit(EXIT_FAILURE); - } + ret = db_env_create(&dbe, 0); + if (ret != 0) { + print_error(__FILE__, __LINE__, "db_env_create, err: %d, %s", ret, + db_strerror(ret)); + exit(EX_ERROR); + } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "db_env_create: %p\n", (void *)dbe); - /* Allow user to override DB_CDB_ALLDB to 0 */ - if ((t = getenv("DB_CDB_ALLDB"))) - cdb_alldb = atoi(t); + dbe->set_errfile(dbe, stderr); - ret = dbe->open(dbe, bogohome, DB_INIT_MPOOL | DB_INIT_CDB | DB_CREATE, /* mode */ 0644); - if (ret != 0) { - dbe->close(dbe, 0); - print_error(__FILE__, __LINE__, "DBENV->open, err: %d, %s", ret, db_strerror(ret)); - exit(EXIT_FAILURE); + if (db_cachesize != 0 && + (ret = dbe->set_cachesize(dbe, db_cachesize/1024, (db_cachesize % 1024) * 1024*1024, 1)) != 0) { + print_error(__FILE__, __LINE__, "DB_ENV->set_cachesize(%u), err: %d, %s", + db_cachesize, ret, db_strerror(ret)); + exit(EXIT_FAILURE); + } + + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_cachesize(%u)\n", db_cachesize); + + /* configure lock system size - locks */ +#if DB_AT_LEAST(3,2) + if ((ret = dbe->set_lk_max_locks(dbe, numlocks)) != 0) +#else + if ((ret = dbe->set_lk_max(dbe, numlocks)) != 0) +#endif + { + print_error(__FILE__, __LINE__, "DB_ENV->set_lk_max_locks(%p, %lu), err: %s", (void *)dbe, + (unsigned long)numlocks, db_strerror(ret)); + exit(EXIT_FAILURE); + } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_lk_max_locks(%p, %lu)\n", (void *)dbe, (unsigned long)numlocks); + +#if DB_AT_LEAST(3,2) + /* configure lock system size - objects */ + if ((ret = dbe->set_lk_max_objects(dbe, numobjs)) != 0) { + print_error(__FILE__, __LINE__, "DB_ENV->set_lk_max_objects(%p, %lu), err: %s", (void *)dbe, + (unsigned long)numobjs, db_strerror(ret)); + exit(EXIT_FAILURE); + } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_lk_max_objects(%p, %lu)\n", (void *)dbe, (unsigned long)numlocks); +#endif + + /* configure automatic deadlock detector */ + if ((ret = dbe->set_lk_detect(dbe, DB_LOCK_DEFAULT)) != 0) { + print_error(__FILE__, __LINE__, "DB_ENV->set_lk_detect(DB_LOCK_DEFAULT), err: %s", db_strerror(ret)); + exit(EXIT_FAILURE); + } + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->set_lk_detect(DB_LOCK_DEFAULT)\n"); + + ret = dbe->open(dbe, bogohome, +#if DB_AT_MOST(3,0) + NULL, +#endif + DB_INIT_MPOOL | DB_INIT_LOCK + | DB_INIT_LOG | DB_INIT_TXN | DB_CREATE | flags, /* mode */ 0644); + if (ret != 0) { + dbe->close(dbe, 0); + print_error(__FILE__, __LINE__, "DB_ENV->open, err: %d, %s", ret, db_strerror(ret)); + if (ret == DB_RUNRECOVERY) { + fprintf(stderr, "To recover, run: bogoutil -v -f \"%s\"\n", + bogohome); } + exit(EXIT_FAILURE); } + + if (DEBUG_DATABASE(1)) + fprintf(dbgout, "DB_ENV->open(home=%s)\n", bogohome); + init = true; return 0; } +/* initialize data base, configure some lock table sizes + * (which can be overridden in the DB_CONFIG file) + * and lock the file to tell other parts we're initialized and + * do not want recovery to stomp over us + */ +int db_init(void) { + const u_int32_t numlocks = 16384; + const u_int32_t numobjs = 16384; + + if (needs_recovery(bogohome)) { + db_recover(0, 0); + } + + db_try_glock(F_RDLCK, F_SETLKW); + + if (set_lock(bogohome)) { + exit(EX_ERROR); + } + + return db_xinit(numlocks, numobjs, /* flags */ 0); +} + +int db_recover(int catastrophic, int force) { + int ret; + + while((force || needs_recovery(bogohome)) + && (db_try_glock(F_WRLCK, F_SETLKW) <= 0)) + rand_sleep(10000,1000000); + + if (!(force || needs_recovery(bogohome))) + return 0; + +retry: + if (DEBUG_DATABASE(0)) + fprintf(dbgout, "running %s data base recovery\n", + catastrophic ? "catastrophic" : "regular"); + ret = db_xinit(1024, 1024, catastrophic ? DB_RECOVER_FATAL : DB_RECOVER); + if (ret) { + if(!catastrophic) { + catastrophic = 1; + goto retry; + } + goto rec_fail; + } + + clear_lockfile(bogohome); + ds_cleanup(); + + return 0; + +rec_fail: + exit(EX_ERROR); +} + void db_cleanup(void) { if (!init) return; - if (dbe) - dbe->close(dbe, 0); + if (dbe) { + int ret; + + /* checkpoint if more than 64 kB of logs have been written + * or 120 min have passed since the previous checkpoint */ + /* kB min flags */ + ret = BF_TXN_CHECKPOINT(dbe, 64, 120, 0); + ret = db_flush_dirty(dbe, ret); + if (ret) + print_error(__FILE__, __LINE__, "(db) DBE->txn_checkpoint returned %s", db_strerror(ret)); + + ret = dbe->close(dbe, 0); + if (DEBUG_DATABASE(1) || ret) + fprintf(dbgout, "DB_ENV->close(%p): %s\n", (void *)dbe, db_strerror(ret)); + } + clear_lock(); + if (lockfd >= 0) + close(lockfd); /* release locks */ dbe = NULL; init = false; } Index: datastore_db.h =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore_db.h,v retrieving revision 1.14 retrieving revision 1.15 diff -u -d -r1.14 -r1.15 --- datastore_db.h 29 Jun 2004 23:52:29 -0000 1.14 +++ datastore_db.h 29 Oct 2004 01:11:53 -0000 1.15 @@ -33,7 +33,7 @@ dbmode_t mode /** open mode, DS_READ or DS_WRITE */); /** Close file and clean up. */ -void db_close(/*@only@*/ void *vhandle, bool nosync /** Normally false, if true, do not synchronize data. This should not be used in regular operation but only to ease the disk I/O load when the lock operation failed. */); +void db_close(/*@only@*/ void *vhandle); /** Flush pending writes to disk */ void db_flush(void *handle); @@ -82,18 +82,17 @@ /* Returns version string */ const char *db_version_str(void); +/* Transactional interfaces */ +int db_txn_begin(void *handle); +int db_txn_abort(void *handle); +int db_txn_commit(void *handle); + +int db_recover(int catastrophic, int force); + /* Returns is_swapped flag */ bool db_is_swapped(void *vhandle); /* Returns created flag */ bool db_created(void *vhandle); -/* This is not currently used ... - * -#define db_write_lock(fd) db_lock(fd, F_SETLKW, F_WRLCK) -#define db_read_lock(fd) db_lock(fd, F_SETLKW, F_RDLCK) -#define db_unlock(fd) db_lock(fd, F_SETLK, F_UNLCK) - -*/ - #endif Index: datastore_qdbm.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore_qdbm.c,v retrieving revision 1.28 retrieving revision 1.29 diff -u -d -r1.28 -r1.29 --- datastore_qdbm.c 28 Jun 2004 01:43:30 -0000 1.28 +++ datastore_qdbm.c 29 Oct 2004 01:11:53 -0000 1.29 @@ -251,7 +251,7 @@ /* Close files and clean up. */ -void db_close(void *vhandle, bool nosync) +void db_close(void *vhandle) { dbh_t *handle = vhandle; DEPOT *dbp; @@ -259,7 +259,7 @@ if (handle == NULL) return; if (DEBUG_DATABASE(1)) - fprintf(dbgout, "(qdbm) dpclose(%s, %s)\n", handle->name, nosync ? "nosync" : "sync"); + fprintf(dbgout, "(qdbm) dpclose(%s)\n", handle->name); dbp = handle->dbp; @@ -349,3 +349,10 @@ { init = false; } + +/* dummy infrastructure, to be expanded by environment + * or transactional initialization/shutdown */ +int db_txn_begin(void *d) { (void)d; return 0; } +int db_txn_abort(void *d) { (void)d; return 0; } +int db_txn_commit(void *d) { (void)d; return 0; } +int db_recover(int a, int b) { (void)a; (void)b; return 0; } Index: datastore_tdb.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/datastore_tdb.c,v retrieving revision 1.30 retrieving revision 1.31 diff -u -d -r1.30 -r1.31 --- datastore_tdb.c 28 Jun 2004 01:43:30 -0000 1.30 +++ datastore_tdb.c 29 Oct 2004 01:11:53 -0000 1.31 @@ -234,7 +234,7 @@ /* Close files and clean up. */ -void db_close(void *vhandle, bool nosync) +void db_close(void *vhandle) { int ret; dbh_t *handle = vhandle; @@ -242,7 +242,7 @@ if (handle == NULL) return; if (DEBUG_DATABASE(1)) - fprintf(dbgout, "db_close (%s) %s\n", handle->name, nosync ? "nosync" : "sync"); + fprintf(dbgout, "db_close (%s)\n", handle->name); if (handle->locked) { tdb_unlockall(handle->dbp); @@ -355,7 +355,7 @@ int db_init(void) { - init = true; + init = true; return 0; } @@ -363,3 +363,10 @@ { init = false; } + +/* dummy infrastructure, to be expanded by environment + * or transactional initialization/shutdown */ +int db_txn_begin(void *d) { (void)d; return 0; } +int db_txn_abort(void *d) { (void)d; return 0; } +int db_txn_commit(void *d) { (void)d; return 0; } +int db_recover(int a, int b) { (void)a; (void)b; return 0; } Index: maint.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/maint.c,v retrieving revision 1.47 retrieving revision 1.48 diff -u -d -r1.47 -r1.48 --- maint.c 29 Oct 2004 00:15:29 -0000 1.47 +++ maint.c 29 Oct 2004 01:11:53 -0000 1.48 @@ -168,28 +168,15 @@ { int rc = 0; dsh_t *dsh; - bool done = false; dsh = ds_open(CURDIR_S, db_file, DS_WRITE); if (dsh == NULL) return EX_ERROR; - - if (upgrade_wordlist_version) { - done = check_wordlist_version(dsh); - if (!done) - fprintf(dbgout, "Upgrading wordlist.\n"); - else - fprintf(dbgout, "Wordlist has already been upgraded.\n"); - } - - if (!done) + else rc = maintain_wordlist(dsh); - if (!done && upgrade_wordlist_version) - ds_set_wordlist_version(dsh, NULL); - - ds_close(dsh, false); + ds_close(dsh); ds_cleanup(); return rc; @@ -272,16 +259,39 @@ return EX_OK; } -int maintain_wordlist(void *vhandle) +static int maintain_wordlist(void *vhandle) { ta_t *transaction = ta_init(); struct userdata_t userdata; int ret; + bool done = false; userdata.vhandle = vhandle; userdata.transaction = transaction; - ret = ds_foreach(vhandle, maintain_hook, &userdata); + if (DST_OK == ds_txn_begin(vhandle)) { + ret = ds_foreach(vhandle, maintain_hook, &userdata); + } else + ret = -1; - return ret | ta_commit(transaction); + if (upgrade_wordlist_version) { + done = check_wordlist_version(vhandle); + if (!done) + fprintf(dbgout, "Upgrading wordlist.\n"); + else + fprintf(dbgout, "Wordlist has already been upgraded.\n"); + } + + if (!done && upgrade_wordlist_version) + { + dsv_t val; + val.count[0] = CURRENT_VERSION; + val.count[1] = 0; + ds_set_wordlist_version(vhandle, &val); + } + + ret |= ta_commit(transaction); + if (DST_OK != ds_txn_commit(vhandle)) + ret = -1; + return ret; } Index: register.c =================================================================== RCS file: /cvsroot/bogofilter/bogofilter/src/register.c,v retrieving revision 1.36 retrieving revision 1.37 diff -u -d -r1.36 -r1.37 --- register.c 5 Jun 2004 01:40:26 -0000 1.36 +++ register.c 29 Oct 2004 01:11:53 -0000 1.37 @@ -11,6 +11,7 @@ #include "collect.h" #include "format.h" #include "msgcounts.h" +#include "rand_sleep.h" #include "register.h" #include "wordhash.h" #include "wordlists.h" @@ -28,6 +29,9 @@ hashnode_t *node; wordprop_t *wordprop; run_t save_run_type = run_type; + int retrycount = 5; /* we'll retry an aborted + registration five times + before giving up. */ u_int32_t wordcount = h->count; /* use number of unique tokens */ @@ -60,10 +64,31 @@ run_type |= _run_type; +retry: + if (retrycount-- == 0) { + fprintf(stderr, "retry count exceeded, giving up.\n"); + exit(EX_ERROR); + } + + if (ds_txn_begin(list->dsh)) { + fprintf(stderr, "ds_txn_begin error.\n"); + exit(EX_ERROR); + } + for (node = wordhash_first(h); node != NULL; node = wordhash_next(h)) { wordprop = node->buf; - ds_read(list->dsh, node->key, &val); + switch (ds_read(list->dsh, node->key, &val)) { + case DS_ABORT_RETRY: + rand_sleep(4*1000,1000*1000); + goto retry; + case 0: + case 1: + break; + default: + fprintf(stderr, "cannot read from data base.\n"); + exit(EX_ERROR); + } if (incr != IX_UNDF) { u_int32_t *counts = val.count; counts[incr] += wordprop->freq; @@ -72,10 +97,29 @@ u_int32_t *counts = val.count; counts[decr] = ((long)counts[decr] < wordprop->freq) ? 0 : counts[decr] - wordprop->freq; } - ds_write(list->dsh, node->key, &val); + switch (ds_write(list->dsh, node->key, &val)) { + case DS_ABORT_RETRY: + rand_sleep(4*1000,1000*1000); + goto retry; + case 0: + break; + default: + ... [truncated message content] |