[Nagios-db-checkins] nagios-db/neb/postgres pgwrapper.c,NONE,1.1 pgwrapper.h,NONE,1.1 inserter.c,1.9
Status: Beta
Brought to you by:
bench23
From: Bench <be...@us...> - 2005-04-07 21:32:00
|
Update of /cvsroot/nagios-db/nagios-db/neb/postgres In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv15111 Modified Files: inserter.c makefile Added Files: pgwrapper.c pgwrapper.h Log Message: Moved to using a pgwrapper in order to improve startup times and, more importantly, to allow the single-threaded nagios to kick off multiple update threads at once, in order to reduce the time it takes to process a check result. Index: inserter.c =================================================================== RCS file: /cvsroot/nagios-db/nagios-db/neb/postgres/inserter.c,v retrieving revision 1.9 retrieving revision 1.10 diff -u -d -r1.9 -r1.10 --- inserter.c 11 Mar 2005 06:55:04 -0000 1.9 +++ inserter.c 7 Apr 2005 21:31:40 -0000 1.10 @@ -3,7 +3,7 @@ #endif /* we need this for postgres support */ -#include "libpq-fe.h" +#include "pgwrapper.h" /* include the needed event broker header files */ #include "../include/nebmodules.h" @@ -52,31 +52,203 @@ static int processStatus(int, void *); static int processCheck(int, void *); +char * +pullValue(char* in) +{ + char * out = 0; + int i; -PGconn * pgconn = 0; + for(i=0;in[i];i++) + { + if(isspace(in[i])) break; + } + + if(!i) return 0; + + if(!(out = calloc(1,i+1))) return 0; + + memcpy(out,in,i); + + return out; +} /* this function gets called when the module is loaded by the event broker */ -int nebmodule_init(int flags, char *args, nebmodule *handle) +int +nebmodule_init(int flags, char *args, nebmodule *handle) { + char *host = 0; + char *db = 0; + char *user = 0; + char *password = 0; + guint32 timeout = 0; + guint32 writes = 0; + guint32 vacation = 0; + guint32 pooldelay = 0; + char *tok = 0; + char *val = 0; + + g_thread_init(0); + write_to_all_logs("initializing nagios-db postgres inserter...",NSLOG_INFO_MESSAGE); - if(!(pgconn = PQconnectdb("host=meatshake port=5432 dbname=nag user=postgres"))) + /* Parse our arg string. + First, the required args. */ + if(!(tok = strstr(args,"host="))) { - /* couldn't connect; this will be useless. */ - write_to_all_logs("nagios-db: inserter failed to allocate postgres connection",NSLOG_INFO_MESSAGE); + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find postgres host in arguement string",NSLOG_INFO_MESSAGE); + return 0; } - else + if(!(host = pullValue(tok+5))) { - if(CONNECTION_OK != PQstatus(pgconn)) + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid postgres host in arguement string",NSLOG_INFO_MESSAGE); + return 0; + } + + if(!(tok = strstr(args,"db="))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find postgres database in arguement string",NSLOG_INFO_MESSAGE); + free(host); + return 0; + } + if(!(db = pullValue(tok+3))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid postgres database in arguement string",NSLOG_INFO_MESSAGE); + free(host); + return 0; + } + + if(!(tok = strstr(args,"timeout="))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find wrapper timeout value in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + return 0; + } + if(!(val = pullValue(tok+8))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid wrapper timeout value in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + return 0; + } + timeout = atol(val); + free(val); + val=0; + + if(!(tok = strstr(args,"vacation="))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find wrapper vacation value in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + return 0; + } + if(!(val = pullValue(tok+8))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid wrapper vacation value in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + return 0; + } + vacation = atol(val); + free(val); + val=0; + + if(!(tok = strstr(args,"writes="))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find wrapper write limit in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + return 0; + } + if(!(val = pullValue(tok+8))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid wrapper write limit in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + return 0; + } + writes = atol(val); + free(val); + val=0; + + + /* now, try to get the optional args */ + if((tok = strstr(args,"user="))) + { + if(!(user = pullValue(tok+5))) { - char str[2048]; - snprintf(str,sizeof(str),"nagios-db: inserter failed to establish postgres connection (%s)",PQerrorMessage(pgconn)); - write_to_all_logs(str,NSLOG_INFO_MESSAGE); + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid postgres user in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + if(user) free(user); + if(password) free(password); + return 0; } - else - write_to_all_logs("nagios-db: inserter locked into db",NSLOG_INFO_MESSAGE); } + if((tok = strstr(args,"password="))) + { + if(!(password = pullValue(tok+9))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid postgres password in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + if(user) free(user); + if(password) free(password); + return 0; + } + } + + if((tok = strstr(args,"pooldelay="))) + { + if(!(val = pullValue(tok+10))) + { + /* well, this isn't going to fly. */ + write_to_all_logs("nagios-db postgres inserter failed to find valid pooldelay in arguement string",NSLOG_INFO_MESSAGE); + free(host); + free(db); + if(user) free(user); + if(password) free(password); + return 0; + } + + pooldelay = atol(val); + free(val); + val = 0; + } + + + /* kick off the database wrapper */ + + if(InitDatabaseWrapper(host, db, user, password, writes, timeout, vacation, pooldelay)) + { + /* couldn't initialize database wrapper; this will be useless. */ + write_to_all_logs("nagios-db inserter postgres failed to initialize db wrapper",NSLOG_INFO_MESSAGE); + free(host); + free(db); + if(user) free(user); + if(password) free(password); + return 0; + } + + free(host); + free(db); + if(user) free(user); + if(password) free(password); + + write_to_all_logs("nagios-db postgres inserter locked into db",NSLOG_INFO_MESSAGE); neb_register_callback(NEBCALLBACK_TIMED_EVENT_DATA, 0, processStart); @@ -84,28 +256,27 @@ } -/* this function gets called when the module is unoaded by the event broker */ -int nebmodule_deinit(int flags, int reason) +/* this function gets called when the module is unoaded by the event broker */ +int +nebmodule_deinit(int flags, int reason) { /* log a message to the Nagios log file */ write_to_all_logs("nagios-db: inserter unloading...",NSLOG_INFO_MESSAGE); - PQfinish(pgconn); - neb_deregister_callback(NEBCALLBACK_HOST_STATUS_DATA, processStatus); neb_deregister_callback(NEBCALLBACK_SERVICE_STATUS_DATA, processStatus); neb_deregister_callback(NEBCALLBACK_HOST_CHECK_DATA, processCheck); neb_deregister_callback(NEBCALLBACK_SERVICE_CHECK_DATA, processCheck); - pgconn = 0; + ShutDownDatabaseWrapper(); return 0; } -/* helper function to make a string ready for inclusion into a sql query */ -char * -querify(char * src) +/* helper function to make a string ready for inclusion into a sql query */ +char * +querify(char * src) { char * ret = 0; if(src) @@ -125,15 +296,17 @@ return ret; } -/* when Nagios starts up, we'll want to replicate configuration data into the database */ -static int processStart(int cmd, void * data) +/* when Nagios starts up, we'll want to replicate configuration data into the database */ +static int +processStart(int cmd, void * data) { host *hl = 0; service *sl = 0; hostgroup *hg = 0; PGresult *res = 0; - int groupcount = 0; + int groupcount = 0; char q[2048]; + gint8 status = 0; /* verify that we're dealing with the right kind of message. */ if(cmd != NEBCALLBACK_TIMED_EVENT_DATA) return 0; @@ -142,12 +315,11 @@ if(((nebstruct_timed_event_data*)data)->type != NEBTYPE_TIMEDEVENT_ADD) return 0; /* Clear out the previous config info.... */ - res = PQexec(pgconn,"select empty_config()"); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery("select empty_config()",&res,3); + if(status != 1) { - char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't clear config info (%s)", PQresultErrorMessage(res)); - write_to_all_logs(str,NSLOG_INFO_MESSAGE); + snprintf(q,sizeof(q),"nagios-db: couldn't clear config info (wrapper returned %d: %s)", status, PQresultErrorMessage(res)); + write_to_all_logs(q,NSLOG_INFO_MESSAGE); } PQclear(res); @@ -170,11 +342,12 @@ BOOLIFY_STRING(hl->should_be_scheduled)); if(hostName) free(hostName); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + + status = PGwrite(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't configure host using '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't configure host using '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } PQclear(res); @@ -199,11 +372,11 @@ if(groupName) free(groupName); if(groupAlias) free(groupAlias); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't configure hostgroup using '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't configure hostgroup using '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } groupid = atol(PQgetvalue(res,0,0)); @@ -216,11 +389,11 @@ snprintf(q,sizeof(q),"select append_to_hostgroup(%lu,%s)", groupid, NULL_STRING(hostName)); if(hostName) free(hostName); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGwrite(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't configure hostgroup membership using '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't configure hostgroup membership using '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } PQclear(res); @@ -254,11 +427,11 @@ if(hostName) free(hostName); if(serviceDesc) free(serviceDesc); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGwrite(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't configure service using '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't configure service using '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } PQclear(res); @@ -271,11 +444,11 @@ /* also insert various config parameters that we might want */ snprintf(q,sizeof(q),"select replace_config_param('accept_passive_host_checks',%d)",accept_passive_host_checks); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -290,11 +463,11 @@ PQclear(res); snprintf(q,sizeof(q),"select replace_config_param('execute_host_checks',%d)",execute_host_checks); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -309,11 +482,11 @@ PQclear(res); snprintf(q,sizeof(q),"select replace_config_param('accept_passive_service_checks',%d)",accept_passive_service_checks); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -328,11 +501,11 @@ PQclear(res); snprintf(q,sizeof(q),"select replace_config_param('execute_service_checks',%d)",execute_service_checks); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -347,11 +520,11 @@ PQclear(res); snprintf(q,sizeof(q),"select replace_config_param('enable_flap_detection',%d)",enable_flap_detection); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -366,11 +539,11 @@ PQclear(res); snprintf(q,sizeof(q),"select replace_config_param('enable_notifications',%d)",enable_notifications); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -385,11 +558,11 @@ PQclear(res); snprintf(q,sizeof(q),"select replace_config_param('enable_event_handlers',%d)",enable_event_handlers); - res = PQexec(pgconn,q); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(q,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", q, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", q, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -420,6 +593,7 @@ { PGresult *res; char temp_buffer[1024]; + gint8 status = 0; bzero(temp_buffer, sizeof(temp_buffer)); @@ -462,11 +636,11 @@ break; } - res = PQexec(pgconn,temp_buffer); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) + status = PGquery(temp_buffer,&res,3); + if(status != 1) { char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", temp_buffer, PQresultErrorMessage(res)); + snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (wrapper returned %d: %s)", temp_buffer, status, PQresultErrorMessage(res)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); } else @@ -488,7 +662,6 @@ static int processStatus(int cmd, void * data) { - PGresult *res; char temp_buffer[5120]; service *tmp_service; host *tmp_host; @@ -607,24 +780,11 @@ break; } - res = PQexec(pgconn,temp_buffer); - if(!(PQresultStatus(res) == PGRES_COMMAND_OK || PQresultStatus(res) == PGRES_TUPLES_OK)) - { - char str[2048]; - snprintf(str,sizeof(str),"nagios-db: couldn't execute '%s' (%s)", temp_buffer, PQresultErrorMessage(res)); - write_to_all_logs(str,NSLOG_INFO_MESSAGE); - } - else - { - if(atoi(PQgetvalue(res,0,0)) < 0) - { - char str[2048]; - snprintf(str,sizeof(str),"nagios-db: executed '%s' and got back %s", temp_buffer, PQgetvalue(res,0,0)); - write_to_all_logs(str,NSLOG_INFO_MESSAGE); - } - } - - PQclear(res); + /* We use PGexec here to make sure that inserts happen quickly, + but, due to the nature of fire-n-forget, we can't get the status back. + That sucks, but not *too* badly, because by this point things are either working or they're not, + and if they're not, there's not a whole lot we can do about it. */ + PGexec(temp_buffer,3); return 0; } Index: makefile =================================================================== RCS file: /cvsroot/nagios-db/nagios-db/neb/postgres/makefile,v retrieving revision 1.3 retrieving revision 1.4 diff -u -d -r1.3 -r1.4 --- makefile 20 Jan 2005 18:59:52 -0000 1.3 +++ makefile 7 Apr 2005 21:31:40 -0000 1.4 @@ -1,6 +1,2 @@ all: - gcc -Wall -g -O2 -o inserter.o inserter.c -shared -I/usr/local/src/nagios/nagios-cvs/include -I/usr/local/pgsql/include /usr/pgsql/lib/libpq.a - -# debian unstable -#all: -# gcc -Wall -g -O2 -o inserter.o inserter.c -shared -I/home/mkent/nagios/include -I/usr/include/postgresql /usr/lib/libpq.a -lssl -lkrb5 -lcrypt + gcc -Wall -g -o inserter.o pgwrapper.c inserter.c -shared -I/usr/local/src/nagios/nagios/include `pkg-config --cflags glib-2.0` `pkg-config --libs gthread-2.0 glib-2.0` -lcrypt /usr/lib/libpq.a --- NEW FILE: pgwrapper.h --- #ifndef __PGWrapper_H__ #define __PGWrapper_H__ #include <glib.h> #include <libpq-fe.h> gint8 PGquery(char* q, PGresult** res, guint8 tries); /////////////////////// // // Attempt to run query <q> and store any potential records in <*res>. // In case of errors, will retry <tries> times. If retrying doesn't help, only the last error code is returned. // // IT IS THE CALLER'S RESPONSIBILITY TO CALL PQclear() ON THE PGresult* FILLED OUT, REGARDLESS OF QUERY SUCCESS. // NOT DOING SO WILL LEAK MEMORY. // // returns: // 3: query status is PGRES_COPY_IN // 2: query status is PGRES_COPY_OUT // 1: query returned successfully, and *res will hold 1 or more records // 0: query returned successfully without records - a PGresult* structure will still be allocated, so clear it when done! // -1: query failed because connection threads could not initialize // -2: query failed because the connection failed during the query // -3: query failed because of a fatal error during the query (is your query correct?) // -4: query failed because the query string was empty // -5: query failed because of a non-fatal error // -6: query failed because of a bad server response // -7: query failed because the printer is on fire // /////////////////////// gint8 PGwrite(char* q, PGresult** res, guint8 tries); /////////////////////// // // Similar to PGquery, except that the write is queued up for batch writing. // /////////////////////// void PGexec(char* q, guint8 tries); /////////////////////// // // Similar to PGquery, except that no results are returned and the function will return before the query completes. // /////////////////////// gint8 PGflush(); /////////////////////// // // Flushes the batch write queue // /////////////////////// PGconn* FetchRawDatabaseConnection(void); void CloseRawDatabaseConnection(PGconn* con); /////////////////////// // // Used to open and close raw database connections, that will not be part of the database handling pool // /////////////////////// gint8 InitDatabaseWrapper(char* host, char* database, char* username, char* password, guint32 limit, guint32 timeout, guint16 vacation, guint32 poolstats); /////////////////////// // // Initializes the database wrapper. // A connection is not made when this function is called, but variables are set up such that // a connection can be made. (And re-made, if need be.) // host and database are required to be non-null. // username and password may be null, in which case they will not be specified when attempting to connect to the database. // limit is how many writes to allow to queue up in the batch writer before forcing a write // timout is how many seconds to allow to pass before forcing the batch writer to commit // vacation is how many seconds to allow to pass before the batch writer begins a new transaction // poolstats is how often (in seconds) the wrapper should report statistics for the executor pool. 0 means to not report. // /////////////////////// void ShutDownDatabaseWrapper(); /////////////////////// // // Shuts down the database wrapper and cleans up. // Obviously, don't try to use database calls after this. // /////////////////////// #endif // __PGWrapper_H__ --- NEW FILE: pgwrapper.c --- #include <stdlib.h> #include <string.h> #include <unistd.h> #include "pgwrapper.h" #include "../include/common.h" #include "../include/nagios.h" //#define WRAPPER_VERBOSE //#define PRINT_STATEMENTS #define POOL_SIZE 20 typedef struct pgwrapperresult_type { gint8 status; char* input; char* output; PGresult* res; GCond* cond; GMutex* mutex; } pgwrapperresult; typedef struct pgwrapper_exec_packet_type { char *q; gint8 tries; } pgwrapper_exec_packet; gint8 pgwrapper_initialize(void); void pgwrapper_executor_thread_func(gpointer data, gpointer dataform); void pgwrapper_connection_thread_func(gpointer data, gpointer userdata); void noticeProcessorFunc(void*, const char*); gint8 PGquery_internal(char* q,PGresult** res,guint8 tries,GThreadPool* pool); gpointer ForceWriterFlush(gpointer dummy); gpointer ReportPoolStats(gpointer delay); GThreadPool* pgwrapper_connections = 0; GThreadPool* pgwrapper_writer = 0; GThreadPool* pgwrapper_executor = 0; GPrivate* connection_thread_conn = 0; char* db_connect_string = 0; guint32 writeCount; guint32 writeLimit; guint16 writeVacation = 0; guint32 writeTimeout; GStaticMutex writeCountLock = G_STATIC_MUTEX_INIT; gint8 InitDatabaseWrapper(char* host, char* database, char* username, char* password, guint32 write_limit, guint32 write_timeout, guint16 write_vacation, guint32 poolstats) { char tempstr[1000]; gint8 init_result = 0; if(!host || !database) return -1; if(db_connect_string) free(db_connect_string); snprintf(tempstr,sizeof(tempstr),"host=%s dbname=%s", host, database); if(username) { strcat(tempstr, " user="); strcat(tempstr, username); } if(password) { strcat(tempstr, " password="); strcat(tempstr, password); } db_connect_string = strdup(tempstr); writeLimit = write_limit; writeTimeout = write_timeout; writeVacation = write_vacation; writeCount = 0; if((init_result = pgwrapper_initialize()) < 0) { // damn, problem initializing. That sucks. return -1; } if(poolstats) g_thread_create(ReportPoolStats,(gpointer)poolstats,0,0); return 0; } gpointer ReportPoolStats(gpointer delay) { guint32 seconds = (guint32)delay; char str[1000]; while(1) { snprintf(str,sizeof(str),"Executor has %u threads active out of %d max, with %u threads idle and %u tasks outstanding", g_thread_pool_get_num_threads(pgwrapper_executor), g_thread_pool_get_max_threads(pgwrapper_executor), g_thread_pool_get_num_unused_threads(), g_thread_pool_unprocessed (pgwrapper_executor)); write_to_all_logs(str,NSLOG_INFO_MESSAGE); sleep(seconds); } // make the compiler happy return 0; } gint8 PGflush(void) { PGresult *foo = 0; gint8 status = 0; // finish off anything that was left in the writer pool. if((status = PGquery_internal("commit",&foo, 23, pgwrapper_writer))) { PQclear(foo); return status; } PQclear(foo); if(writeVacation) sleep(writeVacation); if((status = PGquery_internal("begin",&foo, 23, pgwrapper_writer))) { PQclear(foo); return status; } PQclear(foo); return status; } void ShutDownDatabaseWrapper(void) { // finish off anything that was left in the writer pool. PGflush(); // get rid of our local variables. free(db_connect_string); // Process all remaining requests in the thread pool and wait for them to finish before // stopping the threads. // Will this free the GPrivate* members too? I hope so.... g_thread_pool_free(pgwrapper_connections,0,1); g_thread_pool_free(pgwrapper_writer,0,1); g_thread_pool_free(pgwrapper_executor,0,1); } gint8 PGquery(char* q,PGresult** res,guint8 tries) { return PGquery_internal(q,res,tries,pgwrapper_connections); } void PGexec(char* q, guint8 tries) { // take this query and push it into our dynamic thread pool, which will handle launching the query and cleaning up any data about it. // make a copy of our args, for when we walk away from this data back up the stack pgwrapper_exec_packet *dataform = (pgwrapper_exec_packet*)calloc(1,sizeof(pgwrapper_exec_packet)); dataform->q = strdup(q); dataform->tries = tries; g_thread_pool_push(pgwrapper_executor,dataform,0); return; } gint8 PGwrite(char* q,PGresult** res,guint8 tries) { g_static_mutex_lock(&writeCountLock); if(writeCount < writeLimit) { writeCount++; g_static_mutex_unlock(&writeCountLock); } else { PGresult *foo = 0; gint8 status = 0; if((status = PGquery_internal("commit", &foo, 23, pgwrapper_writer))) { PQclear(foo); g_static_mutex_unlock(&writeCountLock); return status; } PQclear(foo); writeCount = 0; if(writeVacation) sleep(writeVacation); if((status = PGquery_internal("begin",&foo, 23, pgwrapper_writer))) { PQclear(foo); g_static_mutex_unlock(&writeCountLock); return status; } g_static_mutex_unlock(&writeCountLock); PQclear(foo); } return PGquery_internal(q,res,tries,pgwrapper_writer); } gint8 PGquery_internal(char* q,PGresult** res,guint8 tries,GThreadPool* pool) { pgwrapperresult dataform; #ifdef PRINT_STATEMENTS printf("SQL: '%s'\n", q); #endif // set up the data form so the thread that runs our query knows how to get us the results. dataform.status = 0; dataform.input = q; dataform.output = 0; dataform.res = 0; dataform.cond = g_cond_new(); dataform.mutex = g_mutex_new(); g_mutex_lock(dataform.mutex); g_thread_pool_push(pool,(gpointer)(&dataform),0); // no error reporting while(!dataform.status) { #ifdef WRAPPER_VERBOSE // printf("about to call g_cond_wait(%p,%p); pool has %d threads\n",dataform.cond,dataform.mutex,g_thread_pool_get_num_threads(pool)); #endif g_cond_wait(dataform.cond,dataform.mutex); } g_mutex_unlock(dataform.mutex); #ifdef WRAPPER_VERBOSE // printf("wrapper found query results\n"); #endif // okay, that was fun. Now free up what we can from the dataform to stop evil evil memory leaks. *res = dataform.res; g_cond_free(dataform.cond); g_mutex_free(dataform.mutex); if(dataform.output) { //printf("output text was: '%s'\n",dataform.output); free(dataform.output); } if(dataform.status == -1) { // i.e. the connection failed during query if(!tries) return -2; else { PQclear(*res); return PGquery(q,res,tries-1); } } if((dataform.res == 0) || (PQresultStatus(dataform.res) == PGRES_FATAL_ERROR)) { if(!tries) return -3; else { PQclear(*res); return PGquery(q,res,tries-1); } } if((PQresultStatus(dataform.res) == PGRES_TUPLES_OK)) { if(PQntuples(dataform.res)) return 1; else return 0; } else if((PQresultStatus(dataform.res) == PGRES_COMMAND_OK)) { return 0; } else if((PQresultStatus(dataform.res) == PGRES_EMPTY_QUERY)) { if(!tries) return -4; else { PQclear(*res); return PGquery(q,res,tries-1); } } else if((PQresultStatus(dataform.res) == PGRES_NONFATAL_ERROR)) { if(!tries) return -5; else { PQclear(*res); return PGquery(q,res,tries-1); } } else if((PQresultStatus(dataform.res) == PGRES_COPY_OUT)) { return 2; } else if((PQresultStatus(dataform.res) == PGRES_COPY_IN)) { return 3; } else if((PQresultStatus(dataform.res) == PGRES_BAD_RESPONSE)) { if(!tries) return -6; else { PQclear(*res); return PGquery(q,res,tries-1); } } else if(!tries) return -7; else { PQclear(*res); return PGquery(q,res,tries-1); } } gint8 pgwrapper_initialize(void) { PGresult *foo = 0; // init our thread pools #ifdef WRAPPER_VERBOSE printf("wrapper creating new writer thread pool\n"); #endif pgwrapper_writer = g_thread_pool_new(pgwrapper_connection_thread_func,noticeProcessorFunc,1,TRUE,0); #ifdef WRAPPER_VERBOSE printf("writer pool created\n"); #endif #ifdef WRAPPER_VERBOSE printf("wrapper creating new executor thread pool\n"); #endif pgwrapper_executor = g_thread_pool_new(pgwrapper_executor_thread_func,noticeProcessorFunc,POOL_SIZE,0,0); #ifdef WRAPPER_VERBOSE printf("executor pool created\n"); #endif #ifdef WRAPPER_VERBOSE printf("wrapper creating new thread pool\n"); #endif pgwrapper_connections = g_thread_pool_new(pgwrapper_connection_thread_func, noticeProcessorFunc,POOL_SIZE,TRUE,0); #ifdef WRAPPER_VERBOSE printf("pool created\n"); #endif // We won't bother creating a thread for the writer queue. // We do, however, start the writer queue in a transaction. PGquery_internal("begin",&foo, 23, pgwrapper_writer); PQclear(foo); // give us a thread to flush the write queue periodically g_thread_create(ForceWriterFlush,0,0,0); // return no error return 0; } gpointer ForceWriterFlush(gpointer dummy) { while(1) { sleep(writeTimeout); PGflush(); } return 0; } void pgwrapper_executor_thread_func(gpointer dataform, gpointer dummy) { PGresult *res; #ifdef WRAPPER_VERBOSE printf("thread %p about to kick off query %s\n", g_thread_self(), ((pgwrapper_exec_packet*)dataform)->q); #endif PGquery(((pgwrapper_exec_packet*)dataform)->q,&res,((pgwrapper_exec_packet*)dataform)->tries); #ifdef WRAPPER_VERBOSE printf("thread %p about to free packet\n", g_thread_self()); #endif free(((pgwrapper_exec_packet*)dataform)->q); free(dataform); PQclear(res); } void pgwrapper_connection_thread_func(gpointer data, gpointer noticeprocessor) { PGconn* conn; // make sure we can write into our dataform g_mutex_lock(((pgwrapperresult*)data)->mutex); // make sure this thread has a db connection if(!connection_thread_conn) { // this has never been used before. connection_thread_conn = g_private_new(g_free); conn = PQconnectdb(db_connect_string); PQsetNoticeProcessor(conn,noticeprocessor,0); g_private_set(connection_thread_conn,conn); #ifdef WRAPPER_VERBOSE printf("thread %p using new conn %p\n", g_thread_self(), conn); #endif } else { // we used to have data in here... is it valid? if(!(conn = g_private_get(connection_thread_conn))) { // nope, looks like we had a problem in the past. conn = PQconnectdb(db_connect_string); PQsetNoticeProcessor(conn,noticeprocessor,0); g_private_set(connection_thread_conn,conn); #ifdef WRAPPER_VERBOSE printf("thread %p resetting conn to %p\n", g_thread_self(), conn); #endif } } // okay, we have a connection.... is it any good? if (PQstatus(conn) == CONNECTION_BAD) { // guess not #ifdef WRAPPER_VERBOSE printf("thread %p found bad conn: %p\n", g_thread_self(), conn); #endif ((pgwrapperresult*)data)->output = calloc(sizeof(char),1000); snprintf(((pgwrapperresult*)data)->output, 1000, "couldn't connect to database (%s)", PQerrorMessage(conn)); PQfinish(conn); g_private_set(connection_thread_conn,0); ((pgwrapperresult*)data)->status = -1; } else { #ifdef WRAPPER_VERBOSE printf("thread %p sees a good connection at %p for '%s'.\n", g_thread_self(), conn, ((pgwrapperresult*)data)->input); #endif // proceed with the query ((pgwrapperresult*)data)->res = PQexec(conn,((pgwrapperresult*)data)->input); ((pgwrapperresult*)data)->status = 1; } g_cond_signal(((pgwrapperresult*)data)->cond); g_mutex_unlock(((pgwrapperresult*)data)->mutex); return; } void noticeProcessorFunc(void* foo, const char* msg) { // stop printing to stderr! #ifdef WRAPPER_VERBOSE printf("trapped msg: '%s'\n", msg); #endif } void CloseRawDatabaseConnection(PGconn* con) { // fixme - this is currently a noop function. } PGconn* FetchRawDatabaseConnection(void) { return PQconnectdb(db_connect_string); } |