From: <ag...@us...> - 2013-03-21 09:25:19
|
Revision: 2689 http://nagios.svn.sourceforge.net/nagios/?rev=2689&view=rev Author: ageric Date: 2013-03-21 09:25:11 +0000 (Thu, 21 Mar 2013) Log Message: ----------- Make the worker_job and worker_process structures opaque worker_job is really destined for transparency, but for now there's no real need so we keep it private for the time being. This means we can modify the worker manager structures at will and thus paves the way for extending the job-running api as much as we like. Signed-off-by: Andreas Ericsson <ae...@op...> Modified Paths: -------------- nagioscore/trunk/base/workers.c nagioscore/trunk/lib/worker.h nagioscore/trunk/lib/wproc.c Modified: nagioscore/trunk/base/workers.c =================================================================== --- nagioscore/trunk/base/workers.c 2013-03-21 09:24:43 UTC (rev 2688) +++ nagioscore/trunk/base/workers.c 2013-03-21 09:25:11 UTC (rev 2689) @@ -15,10 +15,34 @@ /* perfect hash function for wproc response codes */ #include "wp-phash.c" +struct wproc_worker; + +struct wproc_job { + unsigned int id; + unsigned int type; + unsigned int timeout; + char *command; + void *arg; + struct wproc_worker *wp; +}; + + +struct wproc_worker { + char *name; /**< check-source name of this worker */ + int sd; /**< communication socket */ + pid_t pid; /**< pid */ + int max_jobs; /**< Max number of jobs the worker can handle */ + int jobs_running; /**< jobs running */ + int jobs_started; /**< jobs started */ + int job_index; /**< round-robin slot allocator (this wraps) */ + iocache *ioc; /**< iocache for reading from worker */ + struct wproc_job **jobs; /**< array of jobs */ +}; + struct wproc_list { int len; unsigned int idx; - worker_process **wps; + struct wproc_worker **wps; }; static struct wproc_list workers = {0, 0, NULL}; @@ -119,9 +143,9 @@ return lc->jobs_limit > lc->jobs_running; } -static worker_job *create_job(int type, void *arg, time_t timeout, const char *command) +static struct wproc_job *create_job(int type, void *arg, time_t timeout, const char *command) { - worker_job *job; + struct wproc_job *job; job = calloc(1, sizeof(*job)); if (!job) { @@ -137,14 +161,14 @@ return job; } -static int get_job_id(worker_process *wp) +static int get_job_id(struct wproc_worker *wp) { int i; /* if there can't be any jobs, we break out early */ if (wp->jobs_running >= wp->max_jobs) { logit(NSLOG_RUNTIME_ERROR, TRUE, "Error: Worker '%s' already runs too many jobs (%d >= %d)\n", - wp->source_name, wp->jobs_running, wp->max_jobs); + wp->name, wp->jobs_running, wp->max_jobs); return -1; } @@ -160,11 +184,11 @@ } logit(NSLOG_RUNTIME_ERROR, TRUE, "Error: Failed to find free job-slot for worker '%s' (running: %d; max: %d)\n", - wp->source_name, wp->jobs_running, wp->max_jobs); + wp->name, wp->jobs_running, wp->max_jobs); return -1; } -static worker_job *get_job(worker_process *wp, int job_id) +static struct wproc_job *get_job(struct wproc_worker *wp, int job_id) { /* * XXX FIXME check job->id against job_id and do something if @@ -173,7 +197,7 @@ return wp->jobs[job_id % wp->max_jobs]; } -static void destroy_job(worker_process *wp, worker_job *job) +static void destroy_job(struct wproc_worker *wp, struct wproc_job *job) { if (!job) return; @@ -209,7 +233,7 @@ free(job); } -static int wproc_is_alive(worker_process *wp) +static int wproc_is_alive(struct wproc_worker *wp) { if (!wp || !wp->pid) return 0; @@ -218,7 +242,7 @@ return 0; } -static int wproc_destroy(worker_process *wp, int flags) +static int wproc_destroy(struct wproc_worker *wp, int flags) { int i = 0, force = 0, self; @@ -236,7 +260,7 @@ /* free all memory when either forcing or a worker called us */ iocache_destroy(wp->ioc); wp->ioc = NULL; - my_free(wp->source_name); + my_free(wp->name); if (wp->jobs) { for (i = 0; i < wp->max_jobs && wp->jobs_running; i++) { if (!wp->jobs[i]) @@ -269,7 +293,7 @@ return 0; } -static worker_process *to_remove = NULL; +static struct wproc_worker *to_remove = NULL; /* remove the worker pointed to by to_remove * if to_remove is null, remove everything */ static int remove_specialized(void *data) @@ -339,7 +363,7 @@ return 0; } -static int handle_worker_check(wproc_result *wpres, worker_process *wp, worker_job *job) +static int handle_worker_check(wproc_result *wpres, struct wproc_worker *wp, struct wproc_job *job) { int result = ERROR; check_result *cr = (check_result *)job->arg; @@ -366,7 +390,7 @@ cr->early_timeout = wpres->early_timeout; cr->exited_ok = wpres->exited_ok; cr->engine = &nagios_check_engine; - cr->source = wp->source_name; + cr->source = wp->name; process_check_result(cr); free_check_result(cr); @@ -473,20 +497,20 @@ unsigned long size; int ret; static struct kvvec kvv = KVVEC_INITIALIZER; - worker_process *wp = (worker_process *)arg; + struct wproc_worker *wp = (struct wproc_worker *)arg; if(iocache_capacity(wp->ioc) == 0) { - logit(NSLOG_RUNTIME_WARNING, TRUE, "wproc: iocache_capacity() is 0 for worker %s.\n", wp->source_name); + logit(NSLOG_RUNTIME_WARNING, TRUE, "wproc: iocache_capacity() is 0 for worker %s.\n", wp->name); } ret = iocache_read(wp->ioc, wp->sd); if (ret < 0) { logit(NSLOG_RUNTIME_WARNING, TRUE, "wproc: iocache_read() from %s returned %d: %s\n", - wp->source_name, ret, strerror(errno)); + wp->name, ret, strerror(errno)); return 0; } else if (ret == 0) { - logit(NSLOG_INFO_MESSAGE, TRUE, "wproc: Socket to worker %s broken, removing", wp->source_name); + logit(NSLOG_INFO_MESSAGE, TRUE, "wproc: Socket to worker %s broken, removing", wp->name); wproc_num_workers_online--; iobroker_unregister(nagios_iobs, sd); to_remove = wp; @@ -515,12 +539,12 @@ } while ((buf = worker_ioc2msg(wp->ioc, &size, 0))) { int job_id = -1; - worker_job *job; + struct wproc_job *job; wproc_result wpres; /* log messages are handled first */ if (size > 5 && !memcmp(buf, "log=", 4)) { - logit(NSLOG_INFO_MESSAGE, TRUE, "wproc: %s: %s\n", wp->source_name, buf + 4); + logit(NSLOG_INFO_MESSAGE, TRUE, "wproc: %s: %s\n", wp->name, buf + 4); continue; } @@ -539,12 +563,12 @@ job = get_job(wp, wpres.job_id); if (!job) { logit(NSLOG_RUNTIME_WARNING, TRUE, "wproc: Job with id '%d' doesn't exist on %s.\n", - job_id, wp->source_name); + job_id, wp->name); continue; } if (wpres.type != job->type) { logit(NSLOG_RUNTIME_WARNING, TRUE, "wproc: %s claims job %d is type %d, but we think it's type %d\n", - wp->source_name, job->id, wpres.type, job->type); + wp->name, job->id, wpres.type, job->type); break; } oj = (wproc_object_job *)job->arg; @@ -644,7 +668,7 @@ { int i, is_global = 1; struct kvvec *info; - worker_process *worker; + struct wproc_worker *worker; logit(NSLOG_INFO_MESSAGE, TRUE, "wproc: Registry request: %s\n", buf); if (!(worker = calloc(1, sizeof(*worker)))) { @@ -668,7 +692,7 @@ for(i = 0; i < info->kv_pairs; i++) { struct key_value *kv = &info->kv[i]; if (!strcmp(kv->key, "name")) { - worker->source_name = strdup(kv->value); + worker->name = strdup(kv->value); } else if (!strcmp(kv->key, "pid")) { worker->pid = atoi(kv->value); @@ -681,14 +705,14 @@ is_global = 0; if (!(command_handlers = dkhash_get(specialized_workers, kv->value, NULL))) { command_handlers = calloc(1, sizeof(struct wproc_list)); - command_handlers->wps = calloc(1, sizeof(worker_process**)); + command_handlers->wps = calloc(1, sizeof(struct wproc_worker**)); command_handlers->len = 1; command_handlers->wps[0] = worker; dkhash_insert(specialized_workers, strdup(kv->value), NULL, command_handlers); } else { command_handlers->len++; - command_handlers->wps = realloc(command_handlers->wps, command_handlers->len * sizeof(worker_process**)); + command_handlers->wps = realloc(command_handlers->wps, command_handlers->len * sizeof(struct wproc_worker**)); command_handlers->wps[command_handlers->len - 1] = worker; } } @@ -703,11 +727,11 @@ */ worker->max_jobs = (iobroker_max_usable_fds() / 2) - 50; } - worker->jobs = calloc(worker->max_jobs, sizeof(worker_job *)); + worker->jobs = calloc(worker->max_jobs, sizeof(struct wproc_job *)); if (is_global) { workers.len++; - workers.wps = realloc(workers.wps, workers.len * sizeof(worker_process *)); + workers.wps = realloc(workers.wps, workers.len * sizeof(struct wproc_worker *)); workers.wps[workers.len - 1] = worker; } wproc_num_workers_online++; @@ -744,9 +768,9 @@ int i; for (i = 0; i < workers.len; i++) { - worker_process *wp = workers.wps[i]; + struct wproc_worker *wp = workers.wps[i]; nsock_printf(sd, "name=%s;pid=%d;jobs_running=%u;jobs_started=%u\n", - wp->source_name, wp->pid, + wp->name, wp->pid, wp->jobs_running, wp->jobs_started); } return 0; @@ -812,9 +836,9 @@ return 0; } -static worker_process *get_worker(worker_job *job) +static struct wproc_worker *get_worker(struct wproc_job *job) { - worker_process *wp = NULL; + struct wproc_worker *wp = NULL; struct wproc_list *wp_list; int i; char *cmd_name, *space, *slash = NULL; @@ -881,11 +905,11 @@ * as well as shipping the command off to a designated * worker */ -static int wproc_run_job(worker_job *job, nagios_macros *mac) +static int wproc_run_job(struct wproc_job *job, nagios_macros *mac) { static struct kvvec kvv = KVVEC_INITIALIZER; struct kvvec_buf *kvvb; - worker_process *wp; + struct wproc_worker *wp; int ret; if (!job) @@ -922,7 +946,7 @@ loadctl.jobs_running++; if (ret != kvvb->bufsize) { logit(NSLOG_RUNTIME_ERROR, TRUE, "wproc: '%s' seems to be choked. ret = %d; bufsize = %lu: errno = %d (%s)\n", - wp->source_name, ret, kvvb->bufsize, errno, strerror(errno)); + wp->name, ret, kvvb->bufsize, errno, strerror(errno)); destroy_job(wp, job); } free(kvvb->buf); @@ -949,7 +973,7 @@ int wproc_notify(char *cname, char *hname, char *sdesc, char *cmd, nagios_macros *mac) { - worker_job *job; + struct wproc_job *job; wproc_object_job *oj; if (!(oj = create_object_job(cname, hname, sdesc))) @@ -962,7 +986,7 @@ int wproc_run_service_job(int jtype, int timeout, service *svc, char *cmd, nagios_macros *mac) { - worker_job *job; + struct wproc_job *job; wproc_object_job *oj; if (!(oj = create_object_job(NULL, svc->host_name, svc->description))) @@ -975,7 +999,7 @@ int wproc_run_host_job(int jtype, int timeout, host *hst, char *cmd, nagios_macros *mac) { - worker_job *job; + struct wproc_job *job; wproc_object_job *oj; if (!(oj = create_object_job(NULL, hst->name, NULL))) @@ -988,7 +1012,7 @@ int wproc_run_check(check_result *cr, char *cmd, nagios_macros *mac) { - worker_job *job; + struct wproc_job *job; int timeout; if (cr->service_description) @@ -1002,7 +1026,7 @@ int wproc_run(int jtype, char *cmd, int timeout, nagios_macros *mac) { - worker_job *job; + struct wproc_job *job; job = create_job(jtype, NULL, timeout, cmd); return wproc_run_job(job, mac); Modified: nagioscore/trunk/lib/worker.h =================================================================== --- nagioscore/trunk/lib/worker.h 2013-03-21 09:24:43 UTC (rev 2688) +++ nagioscore/trunk/lib/worker.h 2013-03-21 09:25:11 UTC (rev 2689) @@ -22,35 +22,6 @@ #define ETIME ETIMEDOUT #endif -struct worker_process; - -/** Worker job data */ -typedef struct worker_job { - int id; /**< job id */ - int type; /**< internal only */ - time_t timeout; /**< timeout, in absolute time */ - char *command; /**< command string for this job */ - struct worker_process *wp; /**< worker process running this job */ - void *arg; /**< any random argument */ -} worker_job; - -/** A worker process as seen from its controller */ -typedef struct worker_process { - const char *type; /**< identifying typename of this worker */ - char *source_name; /**< check-source name of this worker */ - int sd; /**< communication socket */ - pid_t pid; /**< pid */ - int max_jobs; /**< Max number of jobs we can handle */ - int jobs_running; /**< jobs running */ - int jobs_started; /**< jobs started */ - struct timeval start; /**< worker start time */ - iocache *ioc; /**< iocache for reading from worker */ - worker_job **jobs; /**< array of jobs */ - int job_index; /**< round-robin slot allocator (this wraps) */ - struct worker_process *prev_wp; /**< previous worker in list */ - struct worker_process *next_wp; /**< next worker in list */ -} worker_process; - typedef struct iobuf { int fd; unsigned int len; Modified: nagioscore/trunk/lib/wproc.c =================================================================== --- nagioscore/trunk/lib/wproc.c 2013-03-21 09:24:43 UTC (rev 2688) +++ nagioscore/trunk/lib/wproc.c 2013-03-21 09:25:11 UTC (rev 2689) @@ -16,12 +16,19 @@ #include <netinet/in.h> #include "worker.h" + +typedef struct simple_worker { + int pid, sd; + unsigned int job_index; + iocache *ioc; +} simple_worker; + /* we can't handle packets larger than 64MiB */ #define MAX_IOCACHE_SIZE (64 * 1024 * 1024) static int sigreceived; static iobroker_set *iobs; -static struct worker_process *spawn_worker(void (*init_func)(void *), void *init_arg) +static simple_worker *spawn_worker(void (*init_func)(void *), void *init_arg) { int sv[2]; int pid; @@ -38,7 +45,7 @@ /* parent leaves the child */ if (pid) { - worker_process *worker = calloc(1, sizeof(worker_process)); + simple_worker *worker = calloc(1, sizeof(simple_worker)); close(sv[1]); if (!worker) { kill(SIGKILL, pid); @@ -48,11 +55,6 @@ worker->sd = sv[0]; worker->pid = pid; worker->ioc = iocache_create(1 * 1024 * 1024); - - /* 1 socket for master, 2 fd's for each child */ - worker->max_jobs = (iobroker_max_usable_fds() - 1) / 2; - worker->jobs = calloc(worker->max_jobs, sizeof(worker_job *)); - return worker; } @@ -101,7 +103,7 @@ static int print_input(int sd, int events, void *wp_) { int ret, pkt = 0; - worker_process *wp = (worker_process *)wp_; + simple_worker *wp = (simple_worker *)wp_; struct kvvec kvv = KVVEC_INITIALIZER; char *buf; unsigned long tot_bytes = 0, size; @@ -159,14 +161,14 @@ } #define NWPS 3 -static worker_process *wps[NWPS]; +static simple_worker *wps[NWPS]; static int wp_index; static int send_command(int sd, int events, void *discard) { char buf[8192]; int ret; - worker_process *wp; + simple_worker *wp; struct kvvec *kvv; ret = read(sd, buf, sizeof(buf)); @@ -202,7 +204,7 @@ int main(int argc, char **argv) { - struct worker_process *wp; + simple_worker *wp; int i; signal(SIGINT, sighandler); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |