|
From: <cs...@us...> - 2008-09-23 01:13:30
|
Revision: 54
http://bespp.svn.sourceforge.net/bespp/?rev=54&view=rev
Author: csmith
Date: 2008-09-23 01:13:18 +0000 (Tue, 23 Sep 2008)
Log Message:
-----------
- changed job ids to strings in preparation for RM independent interface
- implemented new RM independent interface (rm.h)
- moved LSF-specific code to LSF module (rm_lsf.c)
Modified Paths:
--------------
trunk/Make.config
trunk/besserver/Makefile
trunk/besserver/besserver.c
trunk/besserver/faults.h
trunk/besserver/job.c
trunk/besserver/job.h
Added Paths:
-----------
trunk/besserver/rm.h
trunk/besserver/rm_lsf.c
Modified: trunk/Make.config
===================================================================
--- trunk/Make.config 2008-09-11 23:22:27 UTC (rev 53)
+++ trunk/Make.config 2008-09-23 01:13:18 UTC (rev 54)
@@ -7,6 +7,17 @@
GSOAP_TOP = /home/csmith/src/gsoap-2.7/gsoap
#
+# Which resource manager back end to use. Also need to set the
+# locations of resource manager specific includes and libraries
+#
+RM = rm_lsf
+#RM = rm_pbs
+#RM = rm_sge
+
+RM_INC = $(LSF_INC)
+RM_LIBS = $(LSF_LIBS)
+
+#
# LSF related variables
#
# LSF_LOC is where the 'include' and architecture specific
Modified: trunk/besserver/Makefile
===================================================================
--- trunk/besserver/Makefile 2008-09-11 23:22:27 UTC (rev 53)
+++ trunk/besserver/Makefile 2008-09-23 01:13:18 UTC (rev 54)
@@ -11,14 +11,14 @@
# http://schemas.ggf.org/jsdl/2005/11/jsdl-posix
CC = gcc
-CFLAGS = -g $(GSOAP_DEF) -I. $(GSOAP_INC) $(LSF_INC) $(EXTRA_INC)
+CFLAGS = -g $(GSOAP_DEF) -I. $(GSOAP_INC) $(RM_INC) $(EXTRA_INC)
LDFLAGS = -g $(EXTRA_LIB)
-LIBS = -lssl -lcrypto -lpam $(LSF_LIBS) $(EXTRA_LIBS)
+LIBS = -lssl -lcrypto -lpam $(RM_LIBS) $(EXTRA_LIBS)
BES_WSDL = bes-factory.wsdl
BES_HEADER = bes-factory.h
-BES_H_FILES = job.h namespaces.h faults.h auth.h
-BES_OBJ = besserver.o job.o faults.o auth.o
+BES_H_FILES = job.h namespaces.h faults.h auth.h rm.h
+BES_OBJ = besserver.o job.o faults.o auth.o $(RM).o
SOAP_NS = BESFactoryBinding.nsmap
SOAP_H_FILES = soapH.h soapStub.h
@@ -37,6 +37,7 @@
job.o: job.c $(BES_H_FILES) $(SOAP_NS)
faults.o: faults.c $(BES_H_FILES) $(SOAP_NS)
auth.o: auth.c $(BES_H_FILES) $(SOAP_NS)
+$(RM).o: $(RM).c $(BES_H_FILES) $(SOAP_NS)
soapServer.o: soapServer.c
soapClient.o: soapClient.c
Modified: trunk/besserver/besserver.c
===================================================================
--- trunk/besserver/besserver.c 2008-09-11 23:22:27 UTC (rev 53)
+++ trunk/besserver/besserver.c 2008-09-23 01:13:18 UTC (rev 54)
@@ -35,9 +35,6 @@
#include <paths.h>
#include <time.h>
-#include <lsf/lsf.h>
-#include <lsf/lsbatch.h>
-
#include "wsseapi.h"
#include "BESFactoryBinding.nsmap"
@@ -45,6 +42,7 @@
#include "job.h"
#include "namespaces.h"
#include "faults.h"
+#include "rm.h"
#define USERNAMELEN 256
@@ -52,11 +50,10 @@
#define clear_header(x) if (x->header) memset(x->header, 0, sizeof(struct SOAP_ENV__Header));
-int isElement(struct soap_dom_element *, char *, char *);
-int makeActivityEPR(struct soap*, char *, int, struct wsa__EndpointReferenceType*);
-int makeActivityDomEPR(struct soap*, char *, int, struct soap_dom_element**);
-int getJobIdFromEPR(struct soap*, struct wsa__EndpointReferenceType*, int*);
-int getContainedResources(struct soap*, struct bes__FactoryResourceAttributesDocumentType*, int);
+int isElement(struct soap_dom_element*, char*, char*);
+int makeActivityEPR(struct soap*, char*, char*, struct wsa__EndpointReferenceType*);
+int makeActivityDomEPR(struct soap*, char*, char*, struct soap_dom_element**);
+int getJobIdFromEPR(struct soap*, struct wsa__EndpointReferenceType*, char**);
int processHeaders(struct soap*, char*, int);
@@ -174,15 +171,8 @@
exit(1);
}
- if (lsb_init("besserver")) {
- lsb_perror("main: lsb_init");
- exit(1);
- }
-
soap_ssl_init();
-
soap_init(&soap);
-
soap_set_mode(&soap, SOAP_C_UTFSTRING|SOAP_IO_STORE);
soap_register_plugin(&soap, soap_wsse);
@@ -202,6 +192,11 @@
sprintf(service_endpoint, "https://%s%s%s", host, strlen(portstr)?":":"", portstr);
}
+ if (rm_initialize(&soap, NULL)) {
+ fprintf(stderr, "Couldn't initialize the resource manager\n");
+ exit(1);
+ }
+
m = soap_bind(&soap, host, port, 100);
if (m < 0) {
soap_print_fault(&soap, stderr);
@@ -259,9 +254,9 @@
static char fname[] = "__bes__CreateActivity";
struct jobcard *jc;
struct soap_dom_element *dom;
- int rc, jobid;
+ int rc;
struct SOAP_ENV__Fault *fault;
- char username[USERNAMELEN];
+ char username[USERNAMELEN], *jobid;
fprintf(stderr, "In %s...\n", fname);
@@ -287,13 +282,13 @@
return bes_send_fault(s, fault);
}
- rc = submitLSFJob(jc, &jobid, username);
+ rc = rm_submitJob(s, jc, username, &jobid);
if (rc != BESE_OK) {
if (rc == BESE_PERMISSION) {
fault = bes_NotAuthorizedFault(s, lsb_sysmsg());
return bes_send_fault(s, fault);
} else {
- return soap_receiver_fault(s, LSF_LIB_ERROR, NULL);
+ return soap_receiver_fault(s, BACKEND_ERROR, NULL);
}
}
@@ -317,8 +312,8 @@
struct wsa__EndpointReferenceType *cur;
struct bes__TerminateActivityResponseType *resp_array;
struct soap_dom_element *fault;
- int i, rc, jobid;
- char username[USERNAMELEN];
+ int i, rc;
+ char username[USERNAMELEN], *jobid;
fprintf(stderr, "In %s...\n", fname);
@@ -345,7 +340,8 @@
cur = &(req->ActivityIdentifier[0]);
resp_array[i].ActivityIdentifier = cur;
- if (rc = getJobIdFromEPR(s, cur, &jobid)) {
+ jobid = NULL;
+ if (getJobIdFromEPR(s, cur, &jobid) || !jobid) {
resp_array[i].__any = bes_InvalidActivityFaultDOM(s, "Malformed EPR", "Malformed EPR");
if (resp_array[i].__any == NULL) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
@@ -355,9 +351,9 @@
continue;
}
- fprintf(stderr, "Terminating job %d\n", jobid);
+ fprintf(stderr, "Terminating job %s\n", jobid);
- rc = terminateLSFJob(jobid, username);
+ rc = rm_terminateJob(s, jobid, username);
if (rc != BESE_OK) {
if (rc == BESE_NO_ACTIVITY) {
fault = bes_InvalidActivityFaultDOM(s, "Unknown Activity", "Unknown Activity");
@@ -392,11 +388,9 @@
struct wsa__EndpointReferenceType *cur;
struct bes__GetActivityStatusResponseType *resp_array;
struct bes__ActivityStatusType *status;
- struct bes__ActivityStateType *state;
- struct jobInfoEnt *job;
struct soap_dom_element *fault;
- int i, rc, jobid;
- char username[USERNAMELEN];
+ int i, rc;
+ char username[USERNAMELEN], *jobid;
fprintf(stderr, "In %s...\n", fname);
@@ -423,7 +417,8 @@
cur = &(req->ActivityIdentifier[0]);
resp_array[i].ActivityIdentifier = cur;
- if (rc = getJobIdFromEPR(s, cur, &jobid)) {
+ jobid = NULL;
+ if (getJobIdFromEPR(s, cur, &jobid) || !jobid) {
resp_array[i].__any = bes_InvalidActivityFaultDOM(s, "Malformed EPR", "Malformed EPR");
if (resp_array[i].__any == NULL) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
@@ -432,14 +427,14 @@
continue;
}
- fprintf(stderr, "Getting status for job %d\n", jobid);
+ fprintf(stderr, "Getting status for job %s\n", jobid);
- rc = lsb_openjobinfo(jobid, NULL, "all", NULL, NULL, ALL_JOB);
- if (rc == -1) {
- lsb_perror("GetActivityStatuses: lsb_openjobinfo");
- if (lsberrno == LSBE_NO_JOB) {
+ rc = rm_getJobStatus(s, jobid, username, &status);
+ if (rc != BESE_OK) {
+ if (rc == BESE_NO_ACTIVITY) {
fault = bes_InvalidActivityFaultDOM(s, "Unknown Activity", "Unknown Activity");
- } else {
+ }
+ else {
fault = bes_backend_errorDOM(s);
}
if (fault == NULL) {
@@ -450,35 +445,8 @@
continue;
}
- job = lsb_readjobinfo(NULL);
- if (job == NULL) {
- lsb_perror("GetActivityStatuses: lsb_readjobinfo");
- resp_array[i].__any = bes_backend_errorDOM(s);
- if (resp_array[i].__any == NULL) {
- return soap_receiver_fault(s, MEM_ALLOC, NULL);
- }
- resp_array[i].__size = 1;
- continue;
- }
-
- status = (struct bes__ActivityStatusType*)soap_malloc(s,
- sizeof(struct bes__ActivityStatusType));
- if (status == NULL) {
- return soap_receiver_fault(s, MEM_ALLOC, NULL);
- }
- memset(status, 0, sizeof(struct bes__ActivityStatusType));
-
- if (IS_PEND(job->status)) {
- status->state = Pending;
- } else if (IS_START(job->status)) {
- status->state = Running;
- } else if (IS_FINISH(job->status)) {
- status->state = Finished;
- }
-
resp_array[i].bes__ActivityStatus = status;
- lsb_closejobinfo();
}
resp->__sizeResponse = req->__sizeActivityIdentifier;
@@ -495,10 +463,10 @@
static char fname[] = "__bes__GetActivityDocuments";
struct wsa__EndpointReferenceType *cur;
struct bes__GetActivityDocumentResponseType *resp_array;
- struct jobInfoEnt *job;
+ struct jobcard *job_info;
struct soap_dom_element *fault;
- int i, rc, jobid;
- char username[USERNAMELEN];
+ int i, rc;
+ char username[USERNAMELEN], *jobid;
fprintf(stderr, "In %s...\n", fname);
@@ -525,7 +493,8 @@
cur = &(req->ActivityIdentifier[0]);
resp_array[i].ActivityIdentifier = cur;
- if (getJobIdFromEPR(s, cur, &jobid) < 0) {
+ jobid = NULL;
+ if (getJobIdFromEPR(s, cur, &jobid) || !jobid) {
resp_array[i].__any = bes_InvalidActivityFaultDOM(s, "Malformed EPR", "Malformed EPR");
if (resp_array[i].__any == NULL) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
@@ -534,14 +503,14 @@
continue;
}
- fprintf(stderr, "Getting information for job %d\n", jobid);
+ fprintf(stderr, "Getting information for job %s\n", jobid);
- rc = lsb_openjobinfo(jobid, NULL, "all", NULL, NULL, ALL_JOB);
- if (rc == -1) {
- lsb_perror("GetActivityStatuses: lsb_openjobinfo");
- if (lsberrno == LSBE_NO_JOB) {
+ rc = rm_getJobInfo(s, jobid, username, &job_info);
+ if (rc != BESE_OK) {
+ if (rc == BESE_NO_ACTIVITY) {
fault = bes_InvalidActivityFaultDOM(s, "Unknown Activity", "Unknown Activity");
- } else {
+ }
+ else {
fault = bes_backend_errorDOM(s);
}
if (fault == NULL) {
@@ -552,23 +521,11 @@
continue;
}
- job = lsb_readjobinfo(NULL);
- if (job == NULL) {
- lsb_perror("GetActivityStatuses: lsb_readjobinfo");
- resp_array[i].__any = bes_backend_errorDOM(s);
- if (resp_array[i].__any == NULL) {
- return soap_receiver_fault(s, MEM_ALLOC, NULL);
- }
- resp_array[i].__size = 1;
- continue;
- }
-
- rc = getJSDLFromJobInfo(s, job, &resp_array[i].JobDefinition);
+ rc = getJSDLFromJobInfo(s, job_info, &resp_array[i].JobDefinition);
if (rc) {
return rc;
}
- lsb_closejobinfo();
}
resp->__sizeResponse = req->__sizeActivityIdentifier;
@@ -585,9 +542,12 @@
static char fname[] = "__bes__GetFactoryAttributesDocument";
struct bes__FactoryResourceAttributesDocumentType *attrs;
struct wsa_EndpointReferenceType *epr;
- struct jobInfoEnt *jinfo;
- char *clustername;
- int num_jobs, i, rc;
+ struct rm_clusterInfo *cinfo;
+ struct rm_job *joblist, *job;
+ struct rm_resource *resourcelist, *resource;
+ struct soap_dom_element *contained_resources;
+ struct bes__BasicResourceAttributesDocumentType *res;
+ int num_jobs, num_resources, i, rc;
char username[USERNAMELEN];
fprintf(stderr, "In %s....\n", fname);
@@ -604,89 +564,115 @@
}
memset(attrs, 0, sizeof(struct bes__FactoryResourceAttributesDocumentType));
- /* IsAcceptingNewActivities */
- attrs->IsAcceptingNewActivities = true_;
- /* CommonName */
- clustername = ls_getclustername();
- if (clustername == NULL) {
- fprintf(stderr, "%s: %s: %s", fname, "ls_getclustername", ls_sysmsg());
- return soap_receiver_fault(s, LSF_LIB_ERROR, NULL);
+ if ((rc = rm_getClusterInfo(s, &cinfo)) != BESE_OK) {
+ if (rc == BESE_MEM_ALLOC) {
+ return soap_receiver_fault(s, MEM_ALLOC, NULL);
+ }
+ else if (rc == BESE_BACKEND) {
+ return soap_receiver_fault(s, BACKEND_ERROR, NULL);
+ }
+ else {
+ return soap_receiver_fault(s, UNKNOWN_ERROR, NULL);
+ }
}
- attrs->CommonName = soap_strdup(s, clustername);
- if (attrs->CommonName == NULL) {
+
+ /* attributes from the backend */
+ attrs->IsAcceptingNewActivities = cinfo->IsAcceptingNewActivities;
+ attrs->CommonName = cinfo->CommonName;
+ attrs->LongDescription = cinfo->LongDescription;
+ attrs->__sizeBESExtension = cinfo->num_extensions;
+ attrs->BESExtension = cinfo->BESExtensions;
+ attrs->LocalResourceManagerType = cinfo->LocalResourceManagerType;
+
+ /* Only support basic EPRs, no WS-Names */
+ attrs->__sizeNamingProfile = 1;
+ attrs->NamingProfile = (char**)soap_malloc(s, sizeof(char*));
+ attrs->NamingProfile[0] = soap_strdup(s,
+ "http://schemas.ogf.org/bes/2006/08/bes/naming/BasicWSAddressing");
+ if (attrs->NamingProfile == NULL) {
soap_print_fault(s, stderr);
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
- /* LongDescription */
-
/* TotalNumberOfActivities and ActivityReference */
- num_jobs = lsb_openjobinfo(0, NULL, "all", NULL, NULL, ALL_JOB);
- if (num_jobs == -1) {
- if (lsberrno != LSBE_NO_JOB) {
- fprintf(stderr, "%s: %s: %s", fname, "lsb_openjobinfo", lsb_sysmsg());
- return soap_receiver_fault(s, LSF_LIB_ERROR, NULL);
- } else {
- num_jobs = 0;
+ rc = rm_getJobList(s, NULL, &joblist, &num_jobs);
+ if (rc != BESE_OK) {
+ if (rc == BESE_MEM_ALLOC) {
+ return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
+ else if (rc == BESE_BACKEND) {
+ return soap_receiver_fault(s, BACKEND_ERROR, NULL);
+ }
+ else {
+ return soap_receiver_fault(s, UNKNOWN_ERROR, NULL);
+ }
}
attrs->TotalNumberOfActivities = num_jobs;
- attrs->__sizeActivityReference = num_jobs;
- attrs->ActivityReference = (struct wsa__EndpointReferenceType*)soap_malloc(
- s, sizeof(struct wsa__EndpointReferenceType)*num_jobs);
- if (attrs->ActivityReference == NULL) {
- soap_print_fault(s, stderr);
- return soap_receiver_fault(s, MEM_ALLOC, NULL);
- }
- for (i = 0; i < num_jobs; i++) {
- jinfo = lsb_readjobinfo(NULL);
- if (jinfo == NULL) {
- fprintf(stderr, "%s: %s: %s\n", fname, "lsb_readjobinfo", lsb_sysmsg());
- return soap_receiver_fault(s, LSF_LIB_ERROR, NULL);
- }
- rc = makeActivityEPR(s, service_endpoint, jinfo->jobId,
- &(attrs->ActivityReference[i]));
- if (rc) {
+ if (num_jobs) {
+ attrs->__sizeActivityReference = num_jobs;
+ attrs->ActivityReference = (struct wsa__EndpointReferenceType*)soap_malloc(s, sizeof(struct wsa__EndpointReferenceType)*num_jobs);
+ if (attrs->ActivityReference == NULL) {
soap_print_fault(s, stderr);
- return rc;
+ return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
+ job = joblist;
+ for (i = 0; i < num_jobs; i++) {
+ rc = makeActivityEPR(s, service_endpoint, job->jobid,
+ &(attrs->ActivityReference[i]));
+ if (rc) {
+ soap_print_fault(s, stderr);
+ return rc;
+ }
+ job = job->next;
+ }
}
/* TotalNumberOfContainedResources and ContainedResource */
- if ((rc = getContainedResources(s, attrs, 1))) {
+ rc = rm_getResourceList(s, NULL, &resourcelist, &num_resources);
+ if (rc != BESE_OK) {
if (rc == BESE_MEM_ALLOC) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
- if (rc == BESE_BACKEND) {
- return soap_receiver_fault(s, LSF_LIB_ERROR, NULL);
+ else if (rc == BESE_BACKEND) {
+ return soap_receiver_fault(s, BACKEND_ERROR, NULL);
}
- return soap_receiver_fault(s, UNKNOWN_ERROR, NULL);
+ else {
+ return soap_receiver_fault(s, UNKNOWN_ERROR, NULL);
+ }
}
-
- /* NamingProfile */
- attrs->__sizeNamingProfile = 1;
- attrs->NamingProfile = (char**)soap_malloc(s, sizeof(char*));
- attrs->NamingProfile[0] = soap_strdup(s,
- "http://schemas.ogf.org/bes/2006/08/bes/naming/BasicWSAddressing");
- if (attrs->NamingProfile == NULL) {
- soap_print_fault(s, stderr);
- return soap_receiver_fault(s, MEM_ALLOC, NULL);
- }
+ attrs->TotalNumberOfContainedResources = num_resources;
+ if (num_resources) {
+ contained_resources = (struct soap_dom_element*)soap_malloc(s,
+ sizeof(struct soap_dom_element)*num_resources);
+ if (!contained_resources) {
+ return soap_receiver_fault(s, MEM_ALLOC, NULL);
+ }
+ memset(contained_resources, 0, sizeof(struct soap_dom_element)
+ *num_resources);
+ resource = resourcelist;
+ for (i = 0; i < num_resources; i++) {
+ res = (struct bes__BasicResourceAttributesDocumentType*)soap_malloc(s, sizeof(struct bes__BasicResourceAttributesDocumentType));
+ if (!res) {
+ return soap_receiver_fault(s, MEM_ALLOC, NULL);
+ }
+ memset(res, 0, sizeof(struct bes__BasicResourceAttributesDocumentType));
- /* BESExtension */
+ res->ResourceName = resource->ResourceName;
+ res->CPUCount = resource->CPUCount;
+ res->CPUSpeed = resource->CPUSpeed;
+ res->PhysicalMemory = resource->PhysicalMemory;
+ res->VirtualMemory = resource->VirtualMemory;
- /* LocalResourceManagerType */
- attrs->LocalResourceManagerType = (char*)soap_malloc(s,
- strlen("http://www.platform.com/bes/2006/08/resourcemanager/LSF")
- + strlen(LSF_CURRENT_VERSION)+1);
- if (attrs->LocalResourceManagerType == NULL) {
- soap_print_fault(s, stderr);
- return soap_receiver_fault(s, MEM_ALLOC, NULL);
+ contained_resources[i].type
+ = SOAP_TYPE_bes__BasicResourceAttributesDocumentType;
+ contained_resources[i].node = res;
+ contained_resources[i].soap = s;
+ resource = resource->next;
+ }
+ attrs->ContainedResource = contained_resources;
+ attrs->__sizeContainedResource = num_resources;
}
- sprintf(attrs->LocalResourceManagerType,
- "http://www.platform.com/bes/2006/08/resourcemanager/LSF%s",
- LSF_CURRENT_VERSION);
resp->bes__FactoryResourceAttributesDocument = attrs;
@@ -719,14 +705,11 @@
}
int
-makeActivityEPR(struct soap *s, char *endpoint, int jobid,
+makeActivityEPR(struct soap *s, char *endpoint, char *jobid,
struct wsa__EndpointReferenceType *epr)
{
static char fname[] = "makeActivityEPR";
- char id[64];
- sprintf(id, "%d", jobid);
-
memset(epr, 0, sizeof(wsa__EndpointReferenceType));
epr->Address = (wsa__AttributedURIType*)soap_malloc(s,
sizeof(wsa__AttributedURIType));
@@ -734,25 +717,22 @@
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
memset(epr->Address, 0, sizeof(wsa__AttributedURIType));
- epr->Address->__item = (char*)soap_malloc(s, strlen(endpoint)+strlen(id)+2);
+ epr->Address->__item = (char*)soap_malloc(s, strlen(endpoint)+strlen(jobid)+2);
if (epr->Address->__item == NULL) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
- sprintf(epr->Address->__item, "%s/%s", endpoint, id);
+ sprintf(epr->Address->__item, "%s/%s", endpoint, jobid);
return SOAP_OK;
}
int
-makeActivityDomEPR(struct soap *s, char *endpoint, int jobid,
+makeActivityDomEPR(struct soap *s, char *endpoint, char *jobid,
struct soap_dom_element **ret)
{
- static char fname[] = "makeActivityEPR";
+ static char fname[] = "makeActivityDomEPR";
struct soap_dom_element *activityid, *addr;
- char id[64];
- sprintf(id, "%d", jobid);
-
activityid = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
addr = (struct soap_dom_element*)soap_malloc(s,
@@ -773,11 +753,11 @@
addr->name = soap_strdup(s, "Address");
addr->nstr = soap_strdup(s, WSA_NS);
- addr->data = (char*)soap_malloc(s, strlen(endpoint) + strlen(id) + 2);
+ addr->data = (char*)soap_malloc(s, strlen(endpoint) + strlen(jobid) + 2);
if (!addr->name || !addr->nstr || !addr->data) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
- sprintf(addr->data, "%s/%s", endpoint, id);
+ sprintf(addr->data, "%s/%s", endpoint, jobid);
addr->soap = s;
addr->prnt = activityid;
@@ -785,99 +765,9 @@
return SOAP_OK;
}
-int
-getContainedResources(struct soap *s,
- struct bes__FactoryResourceAttributesDocumentType *attrs,
- int return_hosts)
-{
- static char fname[] = "getContainedResources";
- struct soap_dom_element *contained_resources;
- struct bes__BasicResourceAttributesDocumentType *resource;
- int num_contained_resources = 0;
- struct hostInfo *hinfo;
- char *cpuarch, *osname, *osver;
- int numhosts = 0, numres = 0;
- int i, j;
-
- if (!s || !attrs) {
- return BESE_BAD_ARG;
- }
-
- hinfo = ls_gethostinfo(NULL, &numhosts, NULL, 0, 0);
- if (hinfo == NULL) {
- fprintf(stderr, "%s: ls_gethostinfo error: %s\n", fname, ls_sysmsg());
- return BESE_BACKEND;
- }
- for (i = 0; i < numhosts; i++) {
- if (hinfo[i].maxCpus == 0) {
- /* host is unavailable at this time */
- continue;
- }
- num_contained_resources++;
- }
-
- if (return_hosts) {
- contained_resources = (struct soap_dom_element*)soap_malloc(s,
- sizeof(struct soap_dom_element)*num_contained_resources);
- if (!contained_resources) {
- return BESE_MEM_ALLOC;
- }
- memset(contained_resources, 0, sizeof(struct soap_dom_element)
- *num_contained_resources);
- for (j = 0, i = 0; i < numhosts; i++) {
- if (hinfo[i].maxCpus == 0) {
- continue;
- }
- resource = (struct bes__BasicResourceAttributesDocumentType*)soap_malloc(
- s, sizeof(struct bes__BasicResourceAttributesDocumentType));
- if (!resource) {
- return BESE_MEM_ALLOC;
- }
- memset(resource, 0,
- sizeof(struct bes__BasicResourceAttributesDocumentType));
-
- resource->ResourceName = soap_strdup(s, hinfo[i].hostName);
- if (!resource->ResourceName) {
- return BESE_MEM_ALLOC;
- }
-
- resource->CPUCount = (double*)soap_malloc(s, sizeof(double));
- if (!resource->CPUCount) {
- return BESE_MEM_ALLOC;
- }
- *resource->CPUCount = (double)hinfo[i].maxCpus;
-
- resource->PhysicalMemory = (double*)soap_malloc(s, sizeof(double));
- if (!resource->PhysicalMemory) {
- return BESE_MEM_ALLOC;
- }
- *resource->PhysicalMemory = 1024.0*1024.0*(double)hinfo[i].maxMem;
-
- resource->VirtualMemory = (double*)soap_malloc(s, sizeof(double));
- if (!resource->VirtualMemory) {
- return BESE_MEM_ALLOC;
- }
- *resource->VirtualMemory = 1024.0*1024.0*(double)hinfo[i].maxSwap;
-
- contained_resources[j].type
- = SOAP_TYPE_bes__BasicResourceAttributesDocumentType;
- contained_resources[j].node = resource;
- contained_resources[j].soap = s;
- j++;
- }
- attrs->ContainedResource = contained_resources;
- attrs->__sizeContainedResource = j;
- }
-
- attrs->TotalNumberOfContainedResources = num_contained_resources;
-
- return BESE_OK;
-}
-
-
int
getJobIdFromEPR(struct soap *s, struct wsa__EndpointReferenceType *epr,
- int *jobid)
+ char **jobid)
{
static char fname[] = "getJobIdFromEPR";
char *cp;
@@ -891,7 +781,8 @@
if (!cp) {
return -1;
}
- *jobid = atoi(++cp);
+ cp++;
+ *jobid = soap_strdup(s, cp);
return 0;
}
Modified: trunk/besserver/faults.h
===================================================================
--- trunk/besserver/faults.h 2008-09-11 23:22:27 UTC (rev 53)
+++ trunk/besserver/faults.h 2008-09-23 01:13:18 UTC (rev 54)
@@ -44,6 +44,7 @@
#define UNKNOWN_ERROR "Unknown error"
#define ELEMENT_UNSUPPORTED "Element is unsupported"
#define ELEMENT_UNKNOWN "Element is unrecognized"
+#define BACKEND_ERROR "Failed in a call to the backend resource manager"
#define BES_FAULT_NOT_AUTHORIZED "bes:NotAuthorizedFault"
#define BES_FAULT_NOT_ACCEPTING "bes:NotAcceptingNewActivitiesFault"
Modified: trunk/besserver/job.c
===================================================================
--- trunk/besserver/job.c 2008-09-11 23:22:27 UTC (rev 53)
+++ trunk/besserver/job.c 2008-09-23 01:13:18 UTC (rev 54)
@@ -753,10 +753,11 @@
}
struct soap_dom_element *
-getHPCProfileApplication(struct soap *s, struct jobInfoEnt *job)
+getHPCProfileApplication(struct soap *s, struct jobcard *job)
{
struct soap_dom_element *dom, *cur, *next;
- char *nstr, *cp, **cpp, *command;
+ char *nstr;
+ int i;
nstr = soap_strdup(s, JSDL_HPCPA_NS);
if (!nstr) {
@@ -775,10 +776,6 @@
return NULL;
}
- command = soap_strdup(s, job->submit.command);
- if (!command) {
- return NULL;
- }
cur = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (cur == NULL) {
@@ -791,15 +788,13 @@
if (!cur->name) {
return NULL;
}
- cpp = &command;
- cp = strsep(cpp, " \t");
- cur->data = soap_strdup(s, cp);
+ cur->data = soap_strdup(s, job->executable);
if (!cur->data) {
return NULL;
}
dom->elts = cur;
- while (cp = strsep(cpp, " \t")) {
+ for (i = 0; i < job->num_args; i++ ) {
next = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (next == NULL) {
@@ -812,7 +807,7 @@
if (!next->name) {
return NULL;
}
- next->data = soap_strdup(s, cp);
+ next->data = soap_strdup(s, job->args[i]);
if (!next->data) {
return NULL;
}
@@ -820,7 +815,7 @@
cur = next;
}
- if (job->submit.inFile) {
+ if (job->input) {
next = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (next == NULL) {
@@ -833,7 +828,7 @@
if (!next->name) {
return NULL;
}
- next->data = soap_strdup(s, job->submit.inFile);
+ next->data = soap_strdup(s, job->input);
if (!next->data) {
return NULL;
}
@@ -841,7 +836,7 @@
cur = next;
}
- if (job->submit.outFile) {
+ if (job->output) {
next = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (next == NULL) {
@@ -854,7 +849,7 @@
if (!next->name) {
return NULL;
}
- next->data = soap_strdup(s, job->submit.outFile);
+ next->data = soap_strdup(s, job->output);
if (!next->data) {
return NULL;
}
@@ -862,7 +857,7 @@
cur = next;
}
- if (job->submit.errFile) {
+ if (job->error) {
next = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (next == NULL) {
@@ -875,7 +870,7 @@
if (!next->name) {
return NULL;
}
- next->data = soap_strdup(s, job->submit.errFile);
+ next->data = soap_strdup(s, job->error);
if (!next->data) {
return NULL;
}
@@ -883,7 +878,7 @@
cur = next;
}
- if (job->execCwd) {
+ if (job->wd) {
next = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (next == NULL) {
@@ -896,7 +891,7 @@
if (!next->name) {
return NULL;
}
- next->data = soap_strdup(s, job->execCwd);
+ next->data = soap_strdup(s, job->wd);
if (!next->data) {
return NULL;
}
@@ -904,7 +899,7 @@
cur = next;
}
- if (job->execUsername) {
+ if (job->username) {
next = (struct soap_dom_element*)soap_malloc(s,
sizeof(struct soap_dom_element));
if (next == NULL) {
@@ -917,7 +912,7 @@
if (!next->name) {
return NULL;
}
- next->data = soap_strdup(s, job->execUsername);
+ next->data = soap_strdup(s, job->username);
if (!next->data) {
return NULL;
}
@@ -929,7 +924,7 @@
}
int
-getJSDLFromJobInfo(struct soap *s, struct jobInfoEnt *job,
+getJSDLFromJobInfo(struct soap *s, struct jobcard *job,
struct jsdl__JobDefinition_USCOREType **jsdl_return)
{
struct jsdl__JobDefinition_USCOREType *jsdl;
@@ -975,38 +970,38 @@
memset(tcpu, 0, sizeof(struct jsdl__RangeValue_USCOREType));
memset(exact, 0, sizeof(struct jsdl__Exact_USCOREType));
- ident->jsdl__JobName = soap_strdup(s, job->submit.jobName);
+ ident->jsdl__JobName = soap_strdup(s, job->jobname);
ident->__sizeJobProject = 1;
ident->jsdl__JobProject = (char**)soap_malloc(s, sizeof(char*));
- cp = soap_strdup(s, job->submit.projectName);
+ cp = soap_strdup(s, job->jobproject);
if (!ident->jsdl__JobName || !ident->jsdl__JobProject || !cp) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
ident->jsdl__JobProject[0] = cp;
jdesc->jsdl__JobIdentification = ident;
- if (job->numExHosts) {
- hosts->__sizeHostName = job->numExHosts;
+ if (job->num_hostnames) {
+ hosts->__sizeHostName = job->num_hostnames;
hosts->jsdl__HostName = (char**)soap_malloc(s, sizeof(char*)
- *job->numExHosts);
+ *job->num_hostnames);
if (!hosts->jsdl__HostName) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
- for (i = 0; i < job->numExHosts; i++) {
- hosts->jsdl__HostName[i] = soap_strdup(s, job->exHosts[i]);
+ for (i = 0; i < job->num_hostnames; i++) {
+ hosts->jsdl__HostName[i] = soap_strdup(s, job->hostnames[i]);
if (hosts->jsdl__HostName[i] == NULL) {
return soap_receiver_fault(s, MEM_ALLOC, NULL);
}
}
res->jsdl__CandidateHosts = hosts;
- exact->__item = job->numExHosts;
+ exact->__item = job->num_hostnames;
} else {
exact->__item = 0;
}
tcpu->Exact = exact;
tcpu->__sizeExact = 1;
res->jsdl__TotalCPUCount = tcpu;
- if (job->submit.options & SUB_EXCLUSIVE) {
+ if (job->exclusive) {
res->jsdl__ExclusiveExecution = (enum xsd__boolean*)soap_malloc(s,
sizeof(enum xsd__boolean));
if (!res->jsdl__ExclusiveExecution) {
@@ -1028,431 +1023,3 @@
return SOAP_OK;
}
-int
-runBsubScriptAsUser(char *scriptname, char *user)
-{
- static char fname[] = "runBsubScriptAsUser";
- FILE *fp;
- pid_t pid;
- int pfd[2], jobid = 0;
- struct passwd *pw;
- char *arg0, buf[512];
-
- if (!scriptname || !user) {
- return -1;
- }
-
- if ((pw = getpwnam(user)) == NULL) {
- fprintf(stderr, "%s: couldn't get user %s from passwd\n", fname, user);
- return -1;
- }
-
- if (pipe(pfd) < 0) {
- perror("runBsubScriptAsUser: pipe");
- return -1;
- }
-
- if ((pid = fork()) < 0) {
- perror("runBsubScriptAsUser: fork");
- return -1;
- }
-
- if (pid == 0) {
- /* child process */
- close(pfd[0]);
- if (pfd[1] != STDOUT_FILENO) {
- if (dup2(pfd[1], STDOUT_FILENO) != STDOUT_FILENO) {
- perror("runBsubScriptAsUser (child): dup2");
- _exit(1);
- }
- close(pfd[1]);
- }
- if (seteuid(0)) {
- perror("runBsubScriptAsUser (child): seteuid 0");
- _exit(1);
- }
- if (setgid(pw->pw_gid)) {
- perror("runBsubScriptAsUser (child): setgid");
- _exit(1);
- }
- if (setuid(pw->pw_uid)) {
- perror("runBsubScriptAsUser (child): setuid");
- _exit(1);
- }
- arg0 = strrchr(scriptname, '/');
- if (arg0) arg0++;
- execl(scriptname, arg0, NULL);
- perror("runBsubScriptAsUser (child): execl");
- _exit(1);
- }
-
- /* In the parent */
- close(pfd[1]);
- fp = fdopen(pfd[0], "r");
- if (fp == NULL) {
- perror("runBsubScriptAsUser: fdopen");
- }
- while (fgets(buf, 512, fp)) {
- sscanf(buf, "Job <%d> is submitted to default queue <%*s>.\n", &jobid);
- }
- fclose(fp);
-
- if (waitpid(pid, NULL, 0) < 0) {
- perror("runBsubScriptAsUser: waitpid");
- return -1;
- }
-
- return jobid;
-}
-
-int
-createJobWrapperScript(struct jobcard *jc, char *osuser, char *scriptname, int namelen)
-{
- static char fname[] = "createJobWrapperScript";
- char wrappername[MAXPATHLEN];
- FILE *wrapper;
- int fd, i;
- struct passwd *pw;
- uid_t service_uid;
- struct fileStage *file;
- char *cp;
-
- if (!jc || !osuser || !scriptname) {
- return BESE_OTHER;
- }
-
- if ((pw = getpwnam(osuser)) == NULL) {
- fprintf(stderr, "%s: couldn't get user %s from passwd\n", fname, osuser);
- return BESE_BAD_ARG;
- }
-
- service_uid = geteuid();
-
- if (seteuid(0)) {
- perror("createJobWrapperScript: seteuid 0 (1)");
- return BESE_SYS_ERR;
- }
-
- if (seteuid(pw->pw_uid)) {
- perror("createJobWrapperScript: seteuid user");
- return BESE_SYS_ERR;
- }
-
- if (strlen(pw->pw_dir) + strlen("/jobscript.XXXXXX") + 1 > MAXPATHLEN) {
- fprintf(stderr, "%s: cannot generate wrapper script: path too long\n", fname);
- return BESE_SYS_ERR;
- }
-
- sprintf(wrappername, "%s/jobscript.XXXXXX", pw->pw_dir);
- fd = mkstemp(wrappername);
- if (fd == -1) {
- perror("createJobWrapperScript: mkstemp");
- return BESE_SYS_ERR;
- }
-
- if (fchmod(fd, S_IRWXU)) {
- perror("createJobWrapperScript: fchmod");
- unlink(wrappername);
- return BESE_SYS_ERR;
- }
-
- wrapper = fdopen(fd, "w");
- if (wrapper == NULL) {
- perror("createJobWrapperScript: fdopen");
- unlink(wrappername);
- return BESE_SYS_ERR;
- }
-
- fprintf(wrapper, "#!/bin/sh\n");
-
- file = jc->files;
- while (file) {
- if (file->source) {
- cp = strstr(file->source, "ftp://");
- if (!cp) {
- fprintf(stderr, "%s: malformed source URI for file transfer %s\n", fname, file->source);
- fprintf(stderr, "%s: only support ftp URIs at this time\n", fname);
- unlink(wrappername);
- return BESE_OTHER;
- }
- cp = cp + strlen("ftp://");
-
- fprintf(wrapper, "%s -o %s ftp://", FTP_PROGRAM, file->filename);
- if (file->credential) {
- fprintf(wrapper, "%s:%s@", file->credential->username, file->credential->password);
- }
- fprintf(wrapper, "%s\n", cp);
- }
- file = file->next;
- }
-
- if (jc->executable) {
- fprintf(wrapper, "%s", jc->executable);
- }
- for (i = 0; i < jc->num_args; i++) {
- fprintf(wrapper, " %s", jc->args[i]);
- }
- fprintf(wrapper, "\n");
-
- fprintf(wrapper, "rc=$?\n");
-
- file = jc->files;
- while (file) {
- if (file->target) {
- cp = strstr(file->target, "ftp://");
- if (!cp) {
- fprintf(stderr, "%s: malformed target URI for file transfer %s\n", fname, file->target);
- fprintf(stderr, "%s: only support ftp URIs at this time\n", fname);
- unlink(wrappername);
- return BESE_OTHER;
- }
- cp = cp + strlen("ftp://");
-
- fprintf(wrapper, "%s -u ftp://", FTP_PROGRAM);
- if (file->credential) {
- fprintf(wrapper, "%s:%s@", file->credential->username, file->credential->password);
- }
- fprintf(wrapper, "%s %s\n", cp, file->filename);
- }
- file = file->next;
- }
-
- fprintf(wrapper, "rm $0\n");
-
- fprintf(wrapper, "exit $rc\n");
-
- fclose(wrapper);
-
- if (seteuid(0)) {
- perror("createJobWrapperScript: seteuid 0 (2)");
- unlink(wrappername);
- return BESE_SYS_ERR;
- }
-
- if (seteuid(service_uid)) {
- perror("createJobWrapperScript: seteuid service_uid");
- unlink(wrappername);
- return BESE_SYS_ERR;
- }
-
- strncpy(scriptname, wrappername, namelen-1);
-
- return BESE_OK;
-}
-
-int
-submitLSFJob(struct jobcard *jc, int *return_jobid, char *osuser)
-{
- static char fname[] = "submitLSFJob";
- char scriptname[MAXPATHLEN], wrappername[MAXPATHLEN];
- char buf[512];
- int fd, i, rc, jobid = 0, rr = 0;
- FILE *script;
- struct envvar *cur;
-
- fprintf(stderr, "In submitLSFJob...\n");
-
- if (!jc || !jc->executable) {
- fprintf(stderr, "%s: Need to have the executable name\n", fname);
- return BESE_OTHER;
- }
- if (!osuser) {
- fprintf(stderr, "%s: Need to have the os user\n", fname);
- return BESE_OTHER;
- }
-
- strcpy(scriptname, "/tmp/besserver.XXXXXX");
- fd = mkstemp(scriptname);
- if (fd == -1) {
- perror("submitLSFJob: mkstemp");
- return BESE_OTHER;
- }
- script = fdopen(fd, "w");
- if (script == NULL) {
- perror("submitLSFJob: fdopen");
- return BESE_OTHER;
- }
-
- fprintf(script, "#!/bin/sh\n");
- if (jc->wd)
- fprintf(script, "LSB_JOB_LONG_CWD=%s; export LSB_JOB_LONG_CWD\n",
- jc->wd);
- for (cur = jc->environment; cur; cur = cur->next) {
- fprintf(script, "%s=%s; export %s\n", cur->name, cur->val, cur->name);
- }
- fprintf(script, "bsub ");
- if (jc->appname) {
- fprintf(script, "-a %s ", jc->appname);
- if (LSF_VERSION >= 17) {
- fprintf(script, "-app %s ", jc->appname);
- }
- }
- if (jc->jobname)
- fprintf(script, "-J %s ", jc->jobname);
- if (jc->jobproject)
- fprintf(script, "-P %s ", jc->jobproject);
- if (jc->num_hostnames) {
- fprintf(script, "-m \"");
- for (i = 0; i < jc->num_hostnames; i++)
- fprintf(script, "%s ", jc->hostnames[i]);
- fprintf(script, "\" ");
- }
- if (jc->exclusive)
- fprintf(script, "-x ");
- if (jc->tcpu)
- fprintf(script, "-n %d ", jc->tcpu);
- if (jc->input)
- fprintf(script, "-i %s ", jc->input);
- if (jc->output)
- fprintf(script, "-o %s ", jc->output);
- if (jc->error)
- fprintf(script, "-e %s ", jc->error);
- if (jc->osname) {
- if (!rr) {
- fprintf(script, "-R \"");
- rr = 1;
- } else {
- fprintf(script, " && ");
- }
- fprintf(script, "osname == %s", jc->osname);
- }
- if (jc->osver) {
- if (!rr) {
- fprintf(script, "-R \"");
- rr = 1;
- } else {
- fprintf(script, " && ");
- }
- fprintf(script, "osver == %s", jc->osver);
- }
- if (jc->cpuarch) {
- if (!rr) {
- fprintf(script, "-R \"");
- rr = 1;
- } else {
- fprintf(script, " && ");
- }
- fprintf(script, "cpuarch == %s", jc->cpuarch);
- }
- if (rr) {
- fprintf(script, "\" ");
- }
-
- if ((rc = createJobWrapperScript(jc, osuser, wrappername, MAXPATHLEN)) != BESE_OK) {
- fclose(script);
- unlink(scriptname);
- return rc;
- }
-
- fprintf(script, "%s\n", wrappername);
-
- fclose(script);
-
- if (chmod(scriptname, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)) {
- perror("submitLSFJob: chmod");
- unlink(wrappername);
- unlink(scriptname);
- return BESE_OTHER;
- }
-
- jobid = runBsubScriptAsUser(scriptname, osuser);
-
- unlink(scriptname);
-
- if (jobid < 1) {
- return BESE_OTHER;
- }
-
- *return_jobid = jobid;
-
- return BESE_OK;
-}
-
-int
-terminateLSFJob(int jobid, char *osuser)
-{
- static char fname[] = "terminateLSFJob";
- struct passwd *pw;
- pid_t pid;
- int pfd[2], lsb_rc = LSBE_NO_ERROR;
-
- if (!osuser) {
- fprintf(stderr, "%s: need to provide os user", fname);
- return BESE_BAD_ARG;
- }
-
- if ((pw = getpwnam(osuser)) == NULL) {
- fprintf(stderr, "%s: couldn't get user %s from passwd\n", fname, osuser);
- return BESE_SYS_ERR;
- }
-
- if (pipe(pfd) < 0) {
- perror("terminateLSFJob: pipe");
- return BESE_SYS_ERR;
- }
-
- if ((pid = fork()) < 0) {
- perror("terminateLSFJob: fork");
- return BESE_SYS_ERR;
- }
-
- if (pid == 0) {
- /* child process */
- close(pfd[0]);
-
- if (seteuid(0)) {
- perror("terminateLSFJob (child): seteuid 0");
- _exit(1);
- }
- if (setgid(pw->pw_gid)) {
- perror("terminateLSFJob (child): setgid");
- _exit(1);
- }
- if (setuid(pw->pw_uid)) {
- perror("terminateLSFJob (child): setuid");
- _exit(1);
- }
-
- if (lsb_signaljob(jobid, SIGKILL)) {
- lsb_perror("terminateLSFJob (child)");
- lsb_rc = lsberrno;
- }
-
- if (write(pfd[1], (void*)&lsb_rc, sizeof(lsb_rc)) != sizeof(lsb_rc)) {
- perror("terminateLSFJob (child): write");
- _exit(1);
- }
- _exit(0);
- }
-
- /* In the parent */
- close(pfd[1]);
- if (read(pfd[0], (void*)&lsb_rc, sizeof(lsb_rc)) != sizeof(lsb_rc)) {
- perror("terminateLSFJob: read");
- lsb_rc = -1;
- }
- close(pfd[0]);
-
- if (waitpid(pid, NULL, 0) < 0) {
- perror("terminateLSFJob: waitpid");
- return BESE_SYS_ERR;
- }
-
- switch (lsb_rc) {
- case -1:
- return BESE_SYS_ERR;
- break;
- case LSBE_NO_ERROR:
- return BESE_OK;
- break;
- case LSBE_PERMISSION:
- return BESE_PERMISSION;
- break;
- case LSBE_NO_JOB:
- return BESE_NO_ACTIVITY;
- break;
- default:
- return BESE_BACKEND;
- }
-}
-
Modified: trunk/besserver/job.h
===================================================================
--- trunk/besserver/job.h 2008-09-11 23:22:27 UTC (rev 53)
+++ trunk/besserver/job.h 2008-09-23 01:13:18 UTC (rev 54)
@@ -92,8 +92,6 @@
};
int processJobDefinition(struct soap*, struct soap_dom_element*, struct jobcard*);
-int getJSDLFromJobInfo(struct soap*, struct jobInfoEnt*, struct jsdl__JobDefinition_USCOREType**);
-int submitLSFJob(struct jobcard*, int*, char*);
-int terminateLSFJob(int, char*);
+int getJSDLFromJobInfo(struct soap*, struct jobcard*, struct jsdl__JobDefinition_USCOREType**);
#endif /* _JOB_H */
Added: trunk/besserver/rm.h
===================================================================
--- trunk/besserver/rm.h (rev 0)
+++ trunk/besserver/rm.h 2008-09-23 01:13:18 UTC (rev 54)
@@ -0,0 +1,54 @@
+#ifndef _RM_H
+#define _RM_H
+
+#include "soapH.h"
+#include "job.h"
+
+struct rm_resource {
+ char *ResourceName;
+ char *OperatingSystemName;
+ char *OperatingSystemVersion;
+ char *CPUArchitecture;
+ double *CPUCount;
+ double *CPUSpeed;
+ double *PhysicalMemory;
+ double *VirtualMemory;
+ struct rm_resource *next;
+};
+
+struct rm_filter {
+ char *user;
+ char *state;
+ long startRange;
+ long endRange;
+ char *startTime;
+ char *endTime;
+ char *CompactResources;
+};
+
+struct rm_job {
+ char *jobid;
+ struct rm_job *next;
+};
+
+struct rm_clusterInfo {
+ enum xsd__boolean IsAcceptingNewActivities;
+ char *CommonName;
+ char *LongDescription;
+ int num_extensions;
+ char **BESExtensions;
+ char *LocalResourceManagerType;
+};
+
+int rm_initialize(struct soap*, char*);
+int rm_submitJob(struct soap*, struct jobcard*, char*, char**);
+int rm_terminateJob(struct soap*, char*, char *);
+int rm_getJobInfo(struct soap*, char*, char*, struct jobcard**);
+int rm_getJobStatus(struct soap*, char*, char*, struct bes__ActivityStatusType**);
+int rm_getResourceList(struct soap *, struct rm_filter*, struct rm_resource**, int*);
+int rm_getJobsList(struct soap*, struct rm_filter*, struct rm_job**, int*);
+int rm_getClusterInfo(struct soap*, struct rm_clusterInfo**);
+
+
+#endif /* _RM_H */
+
Added: trunk/besserver/rm_lsf.c
===================================================================
--- trunk/besserver/rm_lsf.c (rev 0)
+++ trunk/besserver/rm_lsf.c 2008-09-23 01:13:18 UTC (rev 54)
@@ -0,0 +1,792 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <pwd.h>
+
+#include <lsf/lsf.h>
+#include <lsf/lsbatch.h>
+
+#include "rm.h"
+
+int
+rm_initialize(struct soap *s, char *servername)
+{
+ if (lsb_init("besserver")) {
+ lsb_perror("rm_initialize: lsb_init");
+ return -1;
+ }
+ return 0;
+}
+
+static int
+createJobWrapperScript(struct jobcard *jc, char *osuser, char *scriptname, int namelen)
+{
+ static char fname[] = "createJobWrapperScript";
+ char wrappername[MAXPATHLEN];
+ FILE *wrapper;
+ int fd, i;
+ struct passwd *pw;
+ uid_t service_uid;
+ struct fileStage *file;
+ char *cp;
+
+ if (!jc || !osuser || !scriptname) {
+ return BESE_OTHER;
+ }
+
+ if ((pw = getpwnam(osuser)) == NULL) {
+ fprintf(stderr, "%s: couldn't get user %s from passwd\n", fname, osuser);
+ return BESE_BAD_ARG;
+ }
+
+ service_uid = geteuid();
+
+ if (seteuid(0)) {
+ perror("createJobWrapperScript: seteuid 0 (1)");
+ return BESE_SYS_ERR;
+ }
+
+ if (seteuid(pw->pw_uid)) {
+ perror("createJobWrapperScript: seteuid user");
+ return BESE_SYS_ERR;
+ }
+
+ if (strlen(pw->pw_dir) + strlen("/jobscript.XXXXXX") + 1 > MAXPATHLEN) {
+ fprintf(stderr, "%s: cannot generate wrapper script: path too long\n", fname);
+ return BESE_SYS_ERR;
+ }
+
+ sprintf(wrappername, "%s/jobscript.XXXXXX", pw->pw_dir);
+ fd = mkstemp(wrappername);
+ if (fd == -1) {
+ perror("createJobWrapperScript: mkstemp");
+ return BESE_SYS_ERR;
+ }
+
+ if (fchmod(fd, S_IRWXU)) {
+ perror("createJobWrapperScript: fchmod");
+ unlink(wrappername);
+ return BESE_SYS_ERR;
+ }
+
+ wrapper = fdopen(fd, "w");
+ if (wrapper == NULL) {
+ perror("createJobWrapperScript: fdopen");
+ unlink(wrappername);
+ return BESE_SYS_ERR;
+ }
+
+ fprintf(wrapper, "#!/bin/sh\n");
+
+ file = jc->files;
+ while (file) {
+ if (file->source) {
+ cp = strstr(file->source, "ftp://");
+ if (!cp) {
+ fprintf(stderr, "%s: malformed source URI for file transfer %s\n", fname, file->source);
+ fprintf(stderr, "%s: only support ftp URIs at this time\n", fname);
+ unlink(wrappername);
+ return BESE_OTHER;
+ }
+ cp = cp + strlen("ftp://");
+
+ fprintf(wrapper, "%s -o %s ftp://", FTP_PROGRAM, file->filename);
+ if (file->credential) {
+ fprintf(wrapper, "%s:%s@", file->credential->username, file->credential->password);
+ }
+ fprintf(wrapper, "%s\n", cp);
+ }
+ file = file->next;
+ }
+
+ if (jc->executable) {
+ fprintf(wrapper, "%s", jc->executable);
+ }
+ for (i = 0; i < jc->num_args; i++) {
+ fprintf(wrapper, " %s", jc->args[i]);
+ }
+ fprintf(wrapper, "\n");
+
+ fprintf(wrapper, "rc=$?\n");
+
+ file = jc->files;
+ while (file) {
+ if (file->target) {
+ cp = strstr(file->target, "ftp://");
+ if (!cp) {
+ fprintf(stderr, "%s: malformed target URI for file transfer %s\n", fname, file->target);
+ fprintf(stderr, "%s: only support ftp URIs at this time\n", fname);
+ unlink(wrappername);
+ return BESE_OTHER;
+ }
+ cp = cp + strlen("ftp://");
+
+ fprintf(wrapper, "%s -u ftp://", FTP_PROGRAM);
+ if (file->credential) {
+ fprintf(wrapper, "%s:%s@", file->credential->username, file->credential->password);
+ }
+ fprintf(wrapper, "%s %s\n", cp, file->filename);
+ }
+ file = file->next;
+ }
+
+ fprintf(wrapper, "rm $0\n");
+
+ fprintf(wrapper, "exit $rc\n");
+
+ fclose(wrapper);
+
+ if (seteuid(0)) {
+ perror("createJobWrapperScript: seteuid 0 (2)");
+ unlink(wrappername);
+ return BESE_SYS_ERR;
+ }
+
+ if (seteuid(service_uid)) {
+ perror("createJobWrapperScript: seteuid service_uid");
+ unlink(wrappername);
+ return BESE_SYS_ERR;
+ }
+
+ strncpy(scriptname, wrappername, namelen-1);
+
+ return BESE_OK;
+}
+
+static int
+runBsubScriptAsUser(char *scriptname, char *user)
+{
+ static char fname[] = "runBsubScriptAsUser";
+ FILE *fp;
+ pid_t pid;
+ int pfd[2], jobid = 0;
+ struct passwd *pw;
+ char *arg0, buf[512];
+
+ if (!scriptname || !user) {
+ return -1;
+ }
+
+ if ((pw = getpwnam(user)) == NULL) {
+ fprintf(stderr, "%s: couldn't get user %s from passwd\n", fname, user);
+ return -1;
+ }
+
+ if (pipe(pfd) < 0) {
+ perror("runBsubScriptAsUser: pipe");
+ return -1;
+ }
+
+ if ((pid = fork()) < 0) {
+ perror("runBsubScriptAsUser: fork");
+ return -1;
+ }
+
+ if (pid == 0) {
+ /* child process */
+ close(pfd[0]);
+ if (pfd[1] != STDOUT_FILENO) {
+ if (dup2(pfd[1], STDOUT_FILENO) != STDOUT_FILENO) {
+ perror("runBsubScriptAsUser (child): dup2");
+ _exit(1);
+ }
+ close(pfd[1]);
+ }
+ if (seteuid(0)) {
+ perror("runBsubScriptAsUser (child): seteuid 0");
+ _exit(1);
+ }
+ if (setgid(pw->pw_gid)) {
+ perror("runBsubScriptAsUser (child): setgid");
+ _exit(1);
+ }
+ if (setuid(pw->pw_uid)) {
+ perror("runBsubScriptAsUser (child): setuid");
+ _exit(1);
+ }
+ arg0 = strrchr(scriptname, '/');
+ if (arg0) arg0++;
+ execl(scriptname, arg0, NULL);
+ perror("runBsubScriptAsUser (child): execl");
+ _exit(1);
+ }
+
+ /* In the parent */
+ close(pfd[1]);
+ fp = fdopen(pfd[0], "r");
+ if (fp == NULL) {
+ perror("runBsubScriptAsUser: fdopen");
+ }
+ while (fgets(buf, 512, fp)) {
+ sscanf(buf, "Job <%d> is submitted to default queue <%*s>.\n", &jobid);
+ }
+ fclose(fp);
+
+ if (waitpid(pid, NULL, 0) < 0) {
+ perror("runBsubScriptAsUser: waitpid");
+ return -1;
+ }
+
+ return jobid;
+}
+
+int
+rm_submitJob(struct soap *s, struct jobcard *jc,
+ char *osuser, char **return_jobid)
+{
+ static char fname[] = "rm_submitJob";
+ char scriptname[MAXPATHLEN], wrappername[MAXPATHLEN];
+ char buf[512];
+ int fd, i, rc, jobid = 0, rr = 0;
+ FILE *script;
+ struct envvar *cur;
+
+ fprintf(stderr, "In rm_submitJob...\n");
+
+ if (!jc || !jc->executable) {
+ fprintf(stderr, "%s: Need to have the executable name\n", fname);
+ return BESE_OTHER;
+ }
+ if (!osuser) {
+ fprintf(stderr, "%s: Need to have the os user\n", fname);
+ return BESE_OTHER;
+ }
+
+ strcpy(scriptname, "/tmp/besserver.XXXXXX");
+ fd = mkstemp(scriptname);
+ if (fd == -1) {
+ perror("rm_submitJob: mkstemp");
+ return BESE_OTHER;
+ }
+ script = fdopen(fd, "w");
+ if (script == NULL) {
+ perror("rm_submitJob: fdopen");
+ return BESE_OTHER;
+ }
+
+ fprintf(script, "#!/bin/sh\n");
+ if (jc->wd)
+ fprintf(script, "LSB_JOB_LONG_CWD=%s; export LSB_JOB_LONG_CWD\n",
+ jc->wd);
+ for (cur = jc->environment; cur; cur = cur->next) {
+ fprintf(script, "%s=%s; export %s\n", cur->name, cur->val, cur->name);
+ }
+ fprintf(script, "bsub ");
+ if (jc->appname) {
+ fprintf(script, "-a %s ", jc->appname);
+ if (LSF_VERSION >= 17) {
+ fprintf(script, "-app %s ", jc->appname);
+ }
+ }
+ if (jc->jobname)
+ fprintf(script, "-J %s ", jc->jobname);
+ if (jc->jobproject)
+ fprintf(script, "-P %s ", jc->jobproject);
+ if (jc->num_hostnames) {
+ fprintf(script, "-m \"");
+ for (i = 0; i < jc->num_hostnames; i++)
+ fprintf(script, "%s ", jc->hostnames[i]);
+ fprintf(script, "\" ");
+ }
+ if (jc->exclusive)
+ fprintf(script, "-x ");
+ if (jc->tcpu)
+ fprintf(script, "-n %d ", jc->tcpu);
+ if (jc->input)
+ fprintf(script, "-i %s ", jc->input);
+ if (jc->output)
+ fprintf(script, "-o %s ", jc->output);
+ if (jc->error)
+ fprintf(script, "-e %s ", jc->error);
+ if (jc->osname) {
+ if (!rr) {
+ fprintf(script, "-R \"");
+ rr = 1;
+ } else {
+ fprintf(script, " && ");
+ }
+ fprintf(script, "osname == %s", jc->osname);
+ }
+ if (jc->osver) {
+ if (!rr) {
+ fprintf(script, "-R \"");
+ rr = 1;
+ } else {
+ fprintf(script, " && ");
+ }
+ fprintf(script, "osver == %s", jc->osver);
+ }
+ if (jc->cpuarch) {
+ if (!rr) {
+ fprintf(script, "-R \"");
+ rr = 1;
+ } else {
+ fprintf(script, " && ");
+ }
+ fprintf(script, "cpuarch == %s", jc->cpuarch);
+ }
+ if (rr) {
+ fprintf(script, "\" ");
+ }
+
+ if ((rc = createJobWrapperScript(jc, osuser, wrappername, MAXPATHLEN)) != BESE_OK) {
+ fclose(script);
+ unlink(scriptname);
+ return rc;
+ }
+
+ fprintf(script, "%s\n", wrappername);
+
+ fclose(script);
+
+ if (chmod(scriptname, S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)) {
+ perror("submitLSFJob: chmod");
+ unlink(wrappername);
+ unlink(scriptname);
+ return BESE_OTHER;
+ }
+
+ jobid = runBsubScriptAsUser(scriptname, osuser);
+
+ unlink(scriptname);
+
+ if (jobid < 1) {
+ return BESE_OTHER;
+ }
+
+ sprintf(buf, "%ld", jobid);
+ *return_jobid = soap_strdup(s, buf);
+ if (!*return_jobid) {
+ return BESE_MEM_ALLOC;
+ }
+
+ return BESE_OK;
+}
+
+int
+rm_terminateJob(struct soap *s, char *job_id, char *osuser)
+{
+ static char fname[] = "rm_terminateJob";
+ struct passwd *pw;
+ pid_t pid;
+ int pfd[2], lsb_rc = LSBE_NO_ERROR;
+ LS_LONG_INT jobid;
+
+ if (!job_id) {
+ fprintf(stderr, "%s: missing job id\n", fname);
+ return BESE_BAD_ARG;
+ }
+ jobid = strtol(job_id, NULL, 0);
+ if (errno == ERANGE || errno == EINVAL) {
+ fprintf(stderr, "%s: job id should be an integer\n", fname);
+ return BESE_BAD_ARG;
+ }
+
+ if (!osuser) {
+ fprintf(stderr, "%s: need to provide os user\n", fname);
+ return BESE_BAD_ARG;
+ }
+
+ if ((pw = getpwnam(osuser)) == NULL) {
+ fprintf(stderr, "%s: couldn't get user %s from passwd\n", fname, osuser);
+ return BESE_SYS_ERR;
+ }
+
+ if (pipe(pfd) < 0) {
+ perror("rm_terminateJob: pipe");
+ return BESE_SYS_ERR;
+ }
+
+ if ((pid = fork()) < 0) {
+ perror("rm_terminateJob: fork");
+ return BESE_SYS_ERR;
+ }
+
+ if (pid == 0) {
+ /* child process */
+ close(pfd[0]);
+
+ if (seteuid(0)) {
+ perror("rm_terminateJob (child): seteuid 0");
+ _exit(1);
+ }
+ if (setgid(pw->pw_gid)) {
+ perror("rm_terminateJob (child): setgid");
+ _exit(1);
+ }
+ if (setuid(pw->pw_uid)) {
+ perror("rm_terminateJob (child): setuid");
+ _exit(1);
+ }
+
+ if (lsb_signaljob(jobid, SIGKILL)) {
+ lsb_perror("rm_terminateJob (child)");
+ lsb_rc = lsberrno;
+ }
+
+ if (write(pfd[1], (void*)&lsb_rc, sizeof(lsb_rc)) != sizeof(lsb_rc)) {
+ perror("rm_terminateJob (child): write");
+ _exit(1);
+ }
+ _exit(0);
+ }
+
+ /* In the parent */
+ close(pfd[1]);
+ if (read(pfd[0], (void*)&lsb_rc, sizeof(lsb_rc)) != sizeof(lsb_rc)) {
+ perror("rm_terminateJob: read");
+ lsb_rc = -1;
+ }
+ close(pfd[0]);
+
+ if (waitpid(pid, NULL, 0) < 0) {
+ perror("rm_terminateJob: waitpid");
+ return BESE_SYS_ERR;
+ }
+
+ switch (lsb_rc) {
+ case -1:
+ return BESE_SYS_ERR;
+ break;
+ case LSBE_NO_ERROR:
+ return BESE_OK;
+ break;
+ case LSBE_PERMISSION:
+ return BESE_PERMISSION;
+ break;
+ case LSBE_NO_JOB:
+ return BESE_NO_ACTIVITY;
+ break;
+ default:
+ return BESE_BACKEND;
+ }
+}
+
+int
+rm_getJobStatus(struct soap *s, char *job_id, char *osuser,
+ struct bes__ActivityStatusType **job_status)
+{
+ struct jobInfoEnt *job;
+ struct bes__ActivityStatusType *status;
+ int rc;
+ LS_LONG_INT jobid;
+
+ if (!job_id || !job_status) {
+ return BESE_BAD_ARG;
+ }
+ jobid = strtol(job_id, NULL, 0);
+ if (errno == ERANGE || errno == EINVAL) {
+ fprintf(stderr, "rm_getJobStatus: job id should be an integer\n");
+ return BESE_BAD_ARG;
+ }
+
+ rc = lsb_openjobinfo(jobid, NULL, "all", NULL, NULL, ALL_JOB);
+ if (rc == -1) {
+ if (lsberrno == LSBE_NO_JOB) {
+ return BESE_NO_ACTIVITY;
+ }
+ lsb_perror("rm_getJobStatus: lsb_openjobinfo");
+ return BESE_BACKEND;
+ }
+
+ job = lsb_readjobinfo(NULL);
+ if (job == NULL) {
+ lsb_perror("rm_getJobStatus: lsb_readjobinfo");
+ lsb_closejobinfo();
+ return BESE_BACKEND;
+ }
+
+ status = (struct bes__ActivityStatusType*)soap_malloc(s, sizeof(struct bes__ActivityStatusType));
+ if (status == NULL) {
+ return BESE_MEM_ALLOC;
+ }
+ memset(status, 0, sizeof(struct bes__ActivityStatusType));
+
+ if (IS_PEND(job->status)) {
+ status->state = Pe...
[truncated message content] |