batchserver-cvs Mailing List for Enterprise Batch Server (Page 17)
Brought to you by:
suresh_pragada
You can subscribe to this list here.
2006 |
Jan
|
Feb
(10) |
Mar
(159) |
Apr
(5) |
May
(52) |
Jun
(70) |
Jul
|
Aug
(28) |
Sep
(256) |
Oct
(38) |
Nov
|
Dec
|
---|---|---|---|---|---|---|---|---|---|---|---|---|
2007 |
Jan
|
Feb
|
Mar
|
Apr
|
May
(3) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: Suresh <sur...@us...> - 2006-05-24 14:34:22
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv15837/jmonks/batchserver/framework/controller Modified Files: JobController.java Log Message: no message Index: JobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/JobController.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** JobController.java 20 May 2006 04:40:43 -0000 1.9 --- JobController.java 24 May 2006 14:33:59 -0000 1.10 *************** *** 1,10 **** package org.jmonks.batchserver.framework.controller; - import java.io.ObjectInputStream; - import java.io.ObjectOutputStream; import org.apache.log4j.Level; import org.apache.log4j.Logger; ! import org.jmonks.batchserver.framework.config.*; ! import org.jmonks.batchserver.framework.*; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.management.JobMonitorMBean; import org.jmonks.batchserver.framework.management.JobManagerMBean; --- 1,10 ---- package org.jmonks.batchserver.framework.controller; import org.apache.log4j.Level; import org.apache.log4j.Logger; ! import org.jmonks.batchserver.framework.JobStatistics; ! import org.jmonks.batchserver.framework.LoggingManager; import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.config.ConfigurationException; + import org.jmonks.batchserver.framework.config.JobControllerConfig; import org.jmonks.batchserver.framework.management.JobMonitorMBean; import org.jmonks.batchserver.framework.management.JobManagerMBean; *************** *** 162,164 **** --- 162,173 ---- */ public abstract ErrorCode process(); + + /** + * Returns the statistics of this job. Statistics should be queried only + * after the completion of controller processing. Querying before the completion + * of processing always returns null. + * + * @return Returns the statistics of this job. + */ + public abstract JobStatistics getJobStatistics(); } |
From: Suresh <sur...@us...> - 2006-05-24 14:34:21
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv15837/jmonks/batchserver/framework/controller/pool Modified Files: PoolJobController.java Log Message: no message Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** PoolJobController.java 20 May 2006 04:40:43 -0000 1.10 --- PoolJobController.java 24 May 2006 14:33:59 -0000 1.11 *************** *** 3,10 **** --- 3,12 ---- import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; + import java.util.Calendar; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; + import org.jmonks.batchserver.framework.JobStatistics; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.config.ConfigurationException; *************** *** 13,21 **** import org.jmonks.batchserver.framework.management.ProcessorState; import org.jmonks.batchserver.framework.management.ProcessorStatus; ! import org.jmonks.batchserver.framework.common.*; ! import org.jmonks.batchserver.framework.controller.*; ! import org.jmonks.batchserver.framework.management.*; ! import EDU.oswego.cs.dl.util.concurrent.*; ! import java.util.*; /** --- 15,19 ---- import org.jmonks.batchserver.framework.management.ProcessorState; import org.jmonks.batchserver.framework.management.ProcessorStatus; ! /** *************** *** 83,86 **** --- 81,88 ---- */ private JobPool pool=null; + /** + * Holds the statistics of the basic job controller. + */ + private JobStatistics jobStatistics=null; private static Logger logger=Logger.getLogger(PoolJobController.class); *************** *** 126,129 **** --- 128,133 ---- this.jobProcessorsResultMap.put(jobLoaderName, jobLoaderFutureResult); this.jobProcessorsMap.put(jobLoaderName, jobLoader); + this.jobStatistics=new JobStatistics(super.getJobName()); + this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); /** * Create, initialize and spawn the processor(s). *************** *** 383,386 **** --- 387,393 ---- } + this.jobStatistics.setEndTime(Calendar.getInstance().getTime()); + this.jobStatistics.setRecordsProcessed(this.getProcessedRecordsCount()); + for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) { *************** *** 543,545 **** --- 550,563 ---- } } + + /** + * @see org.jmonks.batchserver.framework.controller.JobController#getJobStatistics() + */ + public JobStatistics getJobStatistics() + { + if(this.jobStatistics.getEndTime()!=null) + return this.jobStatistics; + else + return null; + } } |
From: Suresh <sur...@us...> - 2006-05-24 14:34:21
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv15837/jmonks/batchserver/framework/controller/basic Modified Files: BasicJobController.java Log Message: no message Index: BasicJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobController.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** BasicJobController.java 19 May 2006 18:55:17 -0000 1.10 --- BasicJobController.java 24 May 2006 14:33:59 -0000 1.11 *************** *** 3,13 **** import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; ! import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; - import java.util.List; import java.util.Map; import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.config.BasicJobControllerConfig; --- 3,13 ---- import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; ! import java.util.Calendar; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; + import org.jmonks.batchserver.framework.JobStatistics; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.config.BasicJobControllerConfig; *************** *** 40,43 **** --- 40,47 ---- */ private Map jobProcessorsResultMap=new Hashtable(); + /** + * Holds the statistics of the basic job controller. + */ + private JobStatistics jobStatistics=null; private static Logger logger=Logger.getLogger(BasicJobController.class); *************** *** 71,74 **** --- 75,80 ---- CountDown countDownLock=new CountDown(threadCount); logger.debug("Going to create " + threadCount + " basic job processor(s)"); + this.jobStatistics=new JobStatistics(super.getJobName()); + this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); for(int i=0;i<threadCount;i++) { *************** *** 345,348 **** --- 351,357 ---- } + this.jobStatistics.setEndTime(Calendar.getInstance().getTime()); + this.jobStatistics.setRecordsProcessed(this.getProcessedRecordsCount()); + for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) { *************** *** 357,359 **** --- 366,379 ---- return returnCode; } + + /** + * @see org.jmonks.batchserver.framework.controller.JobController#getJobStatistics() + */ + public JobStatistics getJobStatistics() + { + if(this.jobStatistics.getEndTime()!=null) + return this.jobStatistics; + else + return null; + } } |
From: Suresh <sur...@us...> - 2006-05-24 14:34:15
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/config In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv15837/jmonks/batchserver/framework/config Modified Files: ConfigurationException.java Log Message: no message Index: ConfigurationException.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/config/ConfigurationException.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** ConfigurationException.java 24 May 2006 03:17:40 -0000 1.10 --- ConfigurationException.java 24 May 2006 14:33:59 -0000 1.11 *************** *** 119,123 **** else { ! errorCode=ErrorCode.CUSTOM_ERROR_CODE.appendMessage("Component = " + this.exceptionComponent + " Message = " ); } --- 119,123 ---- else { ! errorCode=ErrorCode.UNKNOWN_CONFIGURATION_ERROR.appendMessage("Component = " + this.exceptionComponent + " Message = " ); } |
From: Suresh <sur...@us...> - 2006-05-24 03:17:55
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/config In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv22496/org/jmonks/batchserver/framework/config Modified Files: ConfigurationException.java Log Message: no message Index: ConfigurationException.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/config/ConfigurationException.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** ConfigurationException.java 28 Mar 2006 04:53:45 -0000 1.9 --- ConfigurationException.java 24 May 2006 03:17:40 -0000 1.10 *************** *** 57,65 **** * @param exceptionComponent Specifies the source component of this exception. * @param message Messages states the problem. */ public ConfigurationException(String exceptionComponent,String message) { super(message); ! this.exceptionComponent=exceptionComponent; } --- 57,70 ---- * @param exceptionComponent Specifies the source component of this exception. * @param message Messages states the problem. + * + * @throws IllegalArgumentException If exceptionComponent argument is null. */ public ConfigurationException(String exceptionComponent,String message) { super(message); ! if(exceptionComponent!=null) ! this.exceptionComponent=exceptionComponent; ! else ! throw new IllegalArgumentException("exception component cannot be null to construct configuration exception."); } *************** *** 75,78 **** --- 80,130 ---- /** + * Traslates the configuration exception into error code to be retruned + * to the clients. + * + * @return Retruns the error code. + */ + public ErrorCode getErrorCode() + { + ErrorCode errorCode=null; + + if(FRAMEWORK_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.FRAMEWORK_CONFIGURATION_ERROR; + } + else if(JOB_CONFIG_FACTORY_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.JOB_CONFIG_FACTORY_CONFIGURATION_ERROR; + } + else if(JOB_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.JOB_CONFIGURATION_ERROR; + } + else if(JOB_CONTROLLER_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.JOB_CONTROLLER_CONFIGURATION_ERROR; + } + else if(LOGGING_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.JOB_LOGGING_CONFIGURATION_ERROR; + } + else if(REPOSITORY_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.JOB_REPOSITORY_CONFIGURATION_ERROR; + } + else if(JOB_CONNECTOR_CONFIG.equalsIgnoreCase(this.exceptionComponent)) + { + errorCode=ErrorCode.JOB_CONNECTOR_CONFIGURATION_ERROR; + } + else + { + errorCode=ErrorCode.CUSTOM_ERROR_CODE.appendMessage("Component = " + + this.exceptionComponent + " Message = " ); + } + + return errorCode.appendMessage(super.getMessage()); + } + + /** * <p> * Returns the string representation of ConfigurationException class in the format |
From: Suresh <sur...@us...> - 2006-05-24 03:17:54
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv22496/org/jmonks/batchserver/framework Modified Files: Main.java Log Message: no message Index: Main.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/Main.java,v retrieving revision 1.10 retrieving revision 1.11 diff -C2 -d -r1.10 -r1.11 *** Main.java 23 May 2006 03:21:37 -0000 1.10 --- Main.java 24 May 2006 03:17:40 -0000 1.11 *************** *** 70,84 **** { jobName=(String)configMap.get("job-name"); if(jobName!=null && !"".equals(jobName.trim())) ! { ! logger.debug("Job to be invoked = " + jobName); logger.debug("Retrieving the framework configuration"); FrameworkConfig frameworkConfig=FrameworkConfig.getInstance(); logger.debug("Initializing the framework logging"); LoggingManager.initializeFrameworkLogging(frameworkConfig.getFrameworkLoggingConfig()); ! logger.debug("Retrieving the job config factory"); JobConfigFactory jobConfigFactory=JobConfigFactory.getJobConfigFactory(frameworkConfig.getJobConfigFactoryConfig()); logger.debug("Retrieving the job configuration"); JobConfig jobConfig=jobConfigFactory.getJobConfig(jobName); logger.debug("Initializing the job logging"); LoggingManager.initializeJobLogging(jobName,frameworkConfig.getFrameworkLoggingConfig(), jobConfig.getJobLoggingConfig()); --- 70,94 ---- { jobName=(String)configMap.get("job-name"); + logger.debug("Job to be invoked = " + jobName); if(jobName!=null && !"".equals(jobName.trim())) ! { ! /** ! * Get the framework configuration and initialize the framework logging. ! */ logger.debug("Retrieving the framework configuration"); FrameworkConfig frameworkConfig=FrameworkConfig.getInstance(); logger.debug("Initializing the framework logging"); LoggingManager.initializeFrameworkLogging(frameworkConfig.getFrameworkLoggingConfig()); ! /** ! * Get the job configuration from the configuration factory. ! */ ! logger.debug("Retrieving the configuration factory"); JobConfigFactory jobConfigFactory=JobConfigFactory.getJobConfigFactory(frameworkConfig.getJobConfigFactoryConfig()); logger.debug("Retrieving the job configuration"); JobConfig jobConfig=jobConfigFactory.getJobConfig(jobName); + /** + * Initialize the job logging and kick off the controller after + * registering it to the management agent. + */ logger.debug("Initializing the job logging"); LoggingManager.initializeJobLogging(jobName,frameworkConfig.getFrameworkLoggingConfig(), jobConfig.getJobLoggingConfig()); *************** *** 94,97 **** --- 104,108 ---- * Save the statistics */ + } else *************** *** 170,172 **** --- 181,184 ---- System.exit(errorCode.getCode()); } + } |
From: Suresh <sur...@us...> - 2006-05-24 03:17:54
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv22496/org/jmonks/batchserver/framework/common Modified Files: ErrorCode.java Log Message: no message Index: ErrorCode.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common/ErrorCode.java,v retrieving revision 1.16 retrieving revision 1.17 diff -C2 -d -r1.16 -r1.17 *** ErrorCode.java 23 May 2006 03:16:29 -0000 1.16 --- ErrorCode.java 24 May 2006 03:17:40 -0000 1.17 *************** *** 143,147 **** * Represents the job controller configuration error. */ ! public static final ErrorCode JOB_CONTROLLER_CONFIGURATION_ERROR = new ErrorCode(1005,"Job controller configuration is not defined properly."); /** * Represents error because of the exception in basic job processor. --- 143,159 ---- * Represents the job controller configuration error. */ ! public static final ErrorCode JOB_CONTROLLER_CONFIGURATION_ERROR = new ErrorCode(1005,"Job controller configuration is not defined properly."); ! /** ! * Represents the logging configuration error. ! */ ! public static final ErrorCode JOB_LOGGING_CONFIGURATION_ERROR = new ErrorCode(1006,"Job logging configuration is not defined properly."); ! /** ! * Represents the repository configuration error. ! */ ! public static final ErrorCode JOB_REPOSITORY_CONFIGURATION_ERROR = new ErrorCode(1007,"Repository configuration in framework configuration is not defined properly."); ! /** ! * Represents the connector configuration error. ! */ ! public static final ErrorCode JOB_CONNECTOR_CONFIGURATION_ERROR = new ErrorCode(1008,"Repository configuration in framework configuration is not defined properly."); /** * Represents error because of the exception in basic job processor. *************** *** 159,163 **** * Rpresents miscellaneous Error codes. */ ! public static final ErrorCode CUSTOM_ERROR_CODE = new ErrorCode(1301, "Custome Error = "); } --- 171,175 ---- * Rpresents miscellaneous Error codes. */ ! public static final ErrorCode CUSTOM_ERROR_CODE = new ErrorCode(1301, "Custom Error Message = "); } |
From: Suresh <sur...@us...> - 2006-05-23 03:21:41
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv26987/org/jmonks/batchserver/framework Modified Files: Main.java Log Message: no message Index: Main.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/Main.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** Main.java 23 May 2006 03:16:29 -0000 1.9 --- Main.java 23 May 2006 03:21:37 -0000 1.10 *************** *** 170,177 **** System.exit(errorCode.getCode()); } - - private ErrorCode getErrorCode(String componentName) - { - return null; - } } --- 170,172 ---- |
From: Suresh <sur...@us...> - 2006-05-23 03:16:34
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv25263/org/jmonks/batchserver/framework/common Modified Files: ErrorCode.java Log Message: no message Index: ErrorCode.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common/ErrorCode.java,v retrieving revision 1.15 retrieving revision 1.16 diff -C2 -d -r1.15 -r1.16 *** ErrorCode.java 19 May 2006 18:55:17 -0000 1.15 --- ErrorCode.java 23 May 2006 03:16:29 -0000 1.16 *************** *** 62,67 **** /** ! * Creates a new erorcode with the same errorcode and appends the given ! * message to the existing message and returns the new error code. * * @param messageToBeAppended Message that needs to be appended to the existing message. --- 62,67 ---- /** ! * Creates a new ErorCode object with the same error code and appends the given ! * message to the existing message and returns that new error code. * * @param messageToBeAppended Message that needs to be appended to the existing message. |
From: Suresh <sur...@us...> - 2006-05-23 03:16:34
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv25263/org/jmonks/batchserver/framework Modified Files: Main.java Log Message: no message Index: Main.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/Main.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** Main.java 19 May 2006 18:55:17 -0000 1.8 --- Main.java 23 May 2006 03:16:29 -0000 1.9 *************** *** 6,9 **** --- 6,10 ---- import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.common.FrameworkUtil; + import org.jmonks.batchserver.framework.config.ConfigurationException; import org.jmonks.batchserver.framework.config.FrameworkConfig; import org.jmonks.batchserver.framework.config.JobConfig; *************** *** 17,24 **** /** * <p> ! * Main class is the interface to the batch framework. ! * This provides the methods to accept the information to identify the job and kick off the jobs. ! * The framework can be executed either through command line as an independent program or from any other java program. ! * The information on how to kickoff using these two methods available in {@link #main(String[])} and {@link #process(Map)} methods of this class. * </p> * --- 18,27 ---- /** * <p> ! * Main class is the entry point to the batch framework. This provides ! * the methods to accept the information to identify the job and kick off the job. ! * The framework can be executed either through command line as an independent ! * program or from any other java program. The information on how to kickoff using ! * these two methods available in {@link #main(String[])} and {@link #process(java.util.Map)} ! * methods of this class. * </p> * *************** *** 30,42 **** { private static Logger logger=Logger.getLogger(Main.class); - - public Main() - { - } - /** * <p> ! * This method provides the interface to execute the job from another java program. This accepts the job-name and additional configuration information relate to this job as a key value pairs in the map. Among the entries in the map, there should be job-name key available. * <br><br> * <pre> --- 33,44 ---- { private static Logger logger=Logger.getLogger(Main.class); /** * <p> ! * This method provides the interface to execute the job from another java ! * program. This accepts the job-name and additional configuration information ! * relate to this job as a key value pairs in the map. Among the entries ! * in the map, there should be <i>job-name</i> key available with the values ! * as a name of the job to be executed. * <br><br> * <pre> *************** *** 47,76 **** * * ErrorCode statusCode=Main.process(configMap); ! * System.out.println("Job exited with return code : " + errorCode.getErrorCode()); * </pre> * <br><br> ! * Whatever the configuration key values passed here will override the values defined in job configuration. * </p> * <br> * @param configMap Map object consist of all the properties needed to kick off this batch job. * @return Returns zero for success and non-zero for failure. */ public static ErrorCode process(Map configMap) { ! logger.trace("Entering process"); ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { ! String jobName=(String)configMap.get("job-name"); ! FrameworkConfig frameworkConfig=FrameworkConfig.getInstance(); ! LoggingManager.initializeFrameworkLogging(frameworkConfig.getFrameworkLoggingConfig()); ! JobConfigFactory jobConfigFactory=JobConfigFactory.getJobConfigFactory(frameworkConfig.getJobConfigFactoryConfig()); ! JobConfig jobConfig=jobConfigFactory.getJobConfig(jobName); ! LoggingManager.initializeJobLogging(jobName,frameworkConfig.getFrameworkLoggingConfig(), jobConfig.getJobLoggingConfig()); ! JobController jobController=JobController.getJobController(jobName, jobConfig.getJobControllerConfig()); ! JobManagementAgent jobManagementAgent=JobManagementAgent.getJobManagementAgent(); ! jobManagementAgent.start(jobName, jobController); ! returnCode=jobController.process(); ! jobManagementAgent.stop(returnCode); } catch(Throwable exception) --- 49,110 ---- * * ErrorCode statusCode=Main.process(configMap); ! * System.out.println("Job exited with return code : " + errorCode.toString()); * </pre> * <br><br> ! * Whatever the configuration key values passed here will override the values ! * defined in job configuration. * </p> * <br> + * * @param configMap Map object consist of all the properties needed to kick off this batch job. + * * @return Returns zero for success and non-zero for failure. */ public static ErrorCode process(Map configMap) { ! logger.error("Entering process = " + configMap); ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + String jobName=null; + JobManagementAgent jobManagementAgent=null; try { ! jobName=(String)configMap.get("job-name"); ! if(jobName!=null && !"".equals(jobName.trim())) ! { ! logger.debug("Job to be invoked = " + jobName); ! logger.debug("Retrieving the framework configuration"); ! FrameworkConfig frameworkConfig=FrameworkConfig.getInstance(); ! logger.debug("Initializing the framework logging"); ! LoggingManager.initializeFrameworkLogging(frameworkConfig.getFrameworkLoggingConfig()); ! logger.debug("Retrieving the job config factory"); ! JobConfigFactory jobConfigFactory=JobConfigFactory.getJobConfigFactory(frameworkConfig.getJobConfigFactoryConfig()); ! logger.debug("Retrieving the job configuration"); ! JobConfig jobConfig=jobConfigFactory.getJobConfig(jobName); ! logger.debug("Initializing the job logging"); ! LoggingManager.initializeJobLogging(jobName,frameworkConfig.getFrameworkLoggingConfig(), jobConfig.getJobLoggingConfig()); ! logger.debug("Creating the job controller"); ! JobController jobController=JobController.getJobController(jobName, jobConfig.getJobControllerConfig()); ! logger.debug("Retrieving the management agent"); ! jobManagementAgent=JobManagementAgent.getJobManagementAgent(); ! logger.debug("Registering the controller with the management agent"); ! jobManagementAgent.start(jobName, jobController); ! logger.error("Kicking off the controller"); ! returnCode=jobController.process(); ! /** ! * Save the statistics ! */ ! } ! else ! { ! logger.fatal("job-name parameter cannot be found in the configuration to kick of the job = " + jobName); ! returnCode=ErrorCode.JOB_INVOKATION_CONFIGURAION_ERROR; ! } ! } ! catch(ConfigurationException exception) ! { ! exception.printStackTrace(); ! logger.fatal("Configuration Exception in the component " + exception.getExceptionComponent() + ! " while processing the job = " + jobName + " Message = " + exception.getMessage(),exception); ! returnCode=exception.getErrorCode(); } catch(Throwable exception) *************** *** 80,84 **** returnCode=ErrorCode.JOB_COMPLETED_WITH_ERRORS.appendMessage(exception.getMessage()); } ! logger.trace("Exiting process"); return returnCode; } --- 114,124 ---- returnCode=ErrorCode.JOB_COMPLETED_WITH_ERRORS.appendMessage(exception.getMessage()); } ! finally ! { ! logger.debug("Unregistering the controller with the management agent"); ! if(jobManagementAgent!=null) ! jobManagementAgent.stop(returnCode); ! } ! logger.error("Exiting process = " + returnCode); return returnCode; } *************** *** 87,91 **** /** * <p> ! * This method provides the interface to execute the jobs from command line. This accepts the job name/job config and additional configuration information relate to this job as a command line parameters. Each parameter passed through line should be in the format name=value. Among these, either "jobname=xxxxxx" or "jobconfig=xxxxxxxxx" should exist. If these two passed in the parameters, job will exit with error. If job is invoking in independent mode through this method, the return value will be return as exit code. So, exit should be considered as return value. * </p> * <p> --- 127,136 ---- /** * <p> ! * This method provides the interface to execute the jobs from command line. ! * This accepts the job name and additional configuration information relate ! * to this job as a command line parameters. Each parameter passed through ! * command line should be in the format <i>name=value</i>. Among these, ! * <i>job-name=process_file_abc</i> property should exist. ! * The ErrorCode's code value will be return as exit code. * </p> * <p> *************** *** 93,100 **** * java org.jmonks.batchserver.framework.Main job-name=process_file_abc config-name1=config-value1 config-name2=config-value2 * <br><br> - * java org.jmonks.batchserver.framework.Main job-name=<job-config>...</job-config> config-name1=config-value1 config-name2=config-value2 - * <br><br> * Whatever the configuration paremters passed here will override the values defined in job configuration. * </p> * @param args Configuration details in command line params as name=value format * --- 138,144 ---- * java org.jmonks.batchserver.framework.Main job-name=process_file_abc config-name1=config-value1 config-name2=config-value2 * <br><br> * Whatever the configuration paremters passed here will override the values defined in job configuration. * </p> + * * @param args Configuration details in command line params as name=value format * *************** *** 102,107 **** public static void main(String args[]) { ! logger.info("Job started"); Map configMap=new HashMap(); try { --- 146,154 ---- public static void main(String args[]) { ! logger.info("Job processing has been started"); Map configMap=new HashMap(); + /** + * Translate the command line parameters into the configuration map. + */ try { *************** *** 119,127 **** System.exit(ErrorCode.JOB_INVOKATION_CONFIGURAION_ERROR.getCode()); } ! ! Main main=new Main(); ! ErrorCode errorCode=main.process(configMap); ! logger.info("Job finished = " + errorCode); System.exit(errorCode.getCode()); } } --- 166,177 ---- System.exit(ErrorCode.JOB_INVOKATION_CONFIGURAION_ERROR.getCode()); } ! ErrorCode errorCode=Main.process(configMap); ! logger.info("Job finished = " + errorCode.toString()); System.exit(errorCode.getCode()); } + + private ErrorCode getErrorCode(String componentName) + { + return null; + } } |
From: Suresh <sur...@us...> - 2006-05-20 04:40:47
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv17021/org/jmonks/batchserver/framework/controller Modified Files: JobController.java Log Message: no message Index: JobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/JobController.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** JobController.java 4 May 2006 22:26:26 -0000 1.8 --- JobController.java 20 May 2006 04:40:43 -0000 1.9 *************** *** 30,34 **** public abstract class JobController implements JobMonitorMBean, JobManagerMBean { ! protected static final String JOB_CONTROLLER_RESTART_PROPERTY_NAME = "job-controller-restart"; /** * Name of the job this controller belongs to. --- 30,34 ---- public abstract class JobController implements JobMonitorMBean, JobManagerMBean { ! //protected static final String JOB_CONTROLLER_RESTART_PROPERTY_NAME = "job-controller-restart"; /** * Name of the job this controller belongs to. |
From: Suresh <sur...@us...> - 2006-05-20 04:40:47
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv17021/org/jmonks/batchserver/framework/controller/pool Modified Files: PoolJobController.java Log Message: no message Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** PoolJobController.java 19 May 2006 18:55:17 -0000 1.9 --- PoolJobController.java 20 May 2006 04:40:43 -0000 1.10 *************** *** 13,16 **** --- 13,21 ---- import org.jmonks.batchserver.framework.management.ProcessorState; import org.jmonks.batchserver.framework.management.ProcessorStatus; + import org.jmonks.batchserver.framework.common.*; + import org.jmonks.batchserver.framework.controller.*; + import org.jmonks.batchserver.framework.management.*; + import EDU.oswego.cs.dl.util.concurrent.*; + import java.util.*; /** *************** *** 57,61 **** * </table> * </p> ! * * @author Suresh Pragada * @version 1.0 --- 62,66 ---- * </table> * </p> ! * * @author Suresh Pragada * @version 1.0 *************** *** 74,77 **** --- 79,86 ---- */ private Map jobProcessorsResultMap=new Hashtable(); + /** + * Job pool reference being used by the loader and processor. + */ + private JobPool pool=null; private static Logger logger=Logger.getLogger(PoolJobController.class); *************** *** 102,106 **** * Create and initialize the pool */ ! JobPool pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); pool.initialize(poolJobControllerConfig.getPoolConfigProperties()); --- 111,115 ---- * Create and initialize the pool */ ! this.pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); pool.initialize(poolJobControllerConfig.getPoolConfigProperties()); *************** *** 364,367 **** --- 373,378 ---- countDownLock.acquire(); logger.info("Loader and all processor(s) have finished their task."); + /** As per the contract cleanup the pool. */ + this.pool.cleanup(); } catch(InterruptedException exception) |
From: Suresh <sur...@us...> - 2006-05-19 18:56:17
|
Update of /cvsroot/batchserver/batchserver/test/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv32124 Modified Files: CollectionJobPoolTest.java Log Message: no message Index: CollectionJobPoolTest.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/test/org/jmonks/batchserver/framework/controller/pool/CollectionJobPoolTest.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** CollectionJobPoolTest.java 15 May 2006 13:31:35 -0000 1.1 --- CollectionJobPoolTest.java 19 May 2006 18:56:14 -0000 1.2 *************** *** 42,48 **** Thread processor3=new Thread(this.getProcessorRunnable(pool)); loader.start(); ! //processor1.start(); ! //processor2.start(); ! //processor3.start(); } --- 42,48 ---- Thread processor3=new Thread(this.getProcessorRunnable(pool)); loader.start(); ! processor1.start(); ! processor2.start(); ! processor3.start(); } |
From: Suresh <sur...@us...> - 2006-05-19 18:55:23
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv31715/org/jmonks/batchserver/framework/controller/pool Modified Files: PoolJobController.java Log Message: no message Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** PoolJobController.java 17 May 2006 22:04:36 -0000 1.8 --- PoolJobController.java 19 May 2006 18:55:17 -0000 1.9 *************** *** 375,379 **** { ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(threadReturnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) { returnCode=threadReturnCode; --- 375,379 ---- { ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(!ErrorCode.JOB_COMPLETED_SUCCESSFULLY.equals(threadReturnCode)) { returnCode=threadReturnCode; |
From: Suresh <sur...@us...> - 2006-05-19 18:55:23
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv31715/org/jmonks/batchserver/framework/common Modified Files: ErrorCode.java FrameworkUtil.java Log Message: no message Index: FrameworkUtil.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common/FrameworkUtil.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** FrameworkUtil.java 13 Mar 2006 14:32:37 -0000 1.7 --- FrameworkUtil.java 19 May 2006 18:55:17 -0000 1.8 *************** *** 14,19 **** * </p> * <p> ! * Note : This class does not maintain any state. Keep all the methods as static and make ! * sure taken appropriate steps in case of that method is accessed by mulitple threads. * </p> * --- 14,18 ---- * </p> * <p> ! * Note : This class does not maintain any state. Keep all the methods as static * </p> * *************** *** 86,103 **** /** * <p> ! * Loads the property elements exists in the given element to the given map. ! * This looks for the <property> elements in the given element and looks ! * for the value of "key" attribute and the values of <property> element ! * and load them as key values pairs. * </p> * <p> * Property elements should be in the following format.<br><br> ! * <property key="some-config-key1">some-config-value1</property> * </p> * ! * @param configElement DOM Element consists of <property> elements. * @param propertyMap Map to be loaded with property key values. * ! * @throws IllegalArgumentException If either configElement or propertyMap is null. */ public static void loadPropertiesFromStringToMap(String propertiesString,Map propertyMap) --- 85,101 ---- /** * <p> ! * Loads the property key value pairs exists in the given string to the given map. ! * This looks for the properties in concantenated by ":" and each property in turn ! * concatenated by "=". * </p> * <p> * Property elements should be in the following format.<br><br> ! * key1=value1:key2=value2 * </p> * ! * @param propertiesString String consists of properties. * @param propertyMap Map to be loaded with property key values. * ! * @throws IllegalArgumentException If either propertiesString or propertyMap is null. */ public static void loadPropertiesFromStringToMap(String propertiesString,Map propertyMap) *************** *** 114,124 **** { String property=propertiesTokenizer.nextToken(); ! StringTokenizer propertyTokenizer=new StringTokenizer(property,"="); ! String key=propertyTokenizer.nextToken(); ! String value=propertyTokenizer.nextToken(); ! propertyMap.put(key,value); } logger.exiting(FrameworkUtil.class.getName(),"loadPropertiesFromStringToMap"); ! } ! } \ No newline at end of file --- 112,124 ---- { String property=propertiesTokenizer.nextToken(); ! if(property!=null && !"".equals(property.trim())) ! { ! StringTokenizer propertyTokenizer=new StringTokenizer(property,"="); ! String key=propertyTokenizer.nextToken(); ! String value=propertyTokenizer.nextToken(); ! propertyMap.put(key,value); ! } } logger.exiting(FrameworkUtil.class.getName(),"loadPropertiesFromStringToMap"); ! } } \ No newline at end of file Index: ErrorCode.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/common/ErrorCode.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** ErrorCode.java 3 May 2006 22:11:18 -0000 1.14 --- ErrorCode.java 19 May 2006 18:55:17 -0000 1.15 *************** *** 4,10 **** * <p> * The ErrorCode represents the error condition can generate in the system ! * and holds error code and message. This is like a enum class to make sure ! * the exit codes being passed around the application are valid (defined). ! * No instances of this class can be created from outside and cannot be instantiated. * </p> * --- 4,11 ---- * <p> * The ErrorCode represents the error condition can generate in the system ! * and holds error code and message. This is also used to represent the status code ! * for any processor/controller/job. This is like a enum class to make sure ! * the exit codes being passed around the application are valid (defined) and ! * allows the flexibility to append the context specific messages to the error codes. * </p> * *************** *** 27,32 **** /** * <p> ! * Private constructor make sure no instances will be created of this class ! * from outside of this class. * </p> * --- 28,33 ---- /** * <p> ! * Protected constructor to make sure no instances will be created of this class ! * from outside of this class and it is extensible. * </p> * *************** *** 48,52 **** { return this.code; - } --- 49,52 ---- *************** *** 62,65 **** --- 62,99 ---- /** + * Creates a new erorcode with the same errorcode and appends the given + * message to the existing message and returns the new error code. + * + * @param messageToBeAppended Message that needs to be appended to the existing message. + * + * @return Returns a new error code contains the same error code and appended message. + */ + public ErrorCode appendMessage(String messageToBeAppended) + { + return new ErrorCode(this.code, this.message+" " + messageToBeAppended); + } + + /** + * Equality will be based on the code the two error codes contain. + * + * @see java.lang.Object#equals(java.lang.Object) + */ + public boolean equals(Object errorCode) + { + return (errorCode!=null) && (this.getClass()==errorCode.getClass()) && + (this.code==((ErrorCode)errorCode).getCode()); + } + + /** + * Code represented by error code will be returned as a hash code. + * + * @see java.lang.Object#hashCode() + */ + public int hashCode() + { + return this.code; + } + + /** * <p> * Returns the string representation of ErrorCode class in the format *************** *** 78,93 **** } /** ! * Configuration Error Codes. */ - public static final ErrorCode JOB_COMPLETED_SUCCESSFULLY = new ErrorCode(0,"Job successfully completed."); - public static final ErrorCode FRAMEWORK_CONFIGURATION_ERROR = new ErrorCode(1001,"Error while accessing or parsing the framework configuration file."); public static final ErrorCode JOB_CONFIG_FACTORY_CONFIGURATION_ERROR = new ErrorCode(1002,"Job configuration factory cannot be created by the given factory configuration."); public static final ErrorCode JOB_CONFIGURATION_ERROR = new ErrorCode(1003,"Error while loading the job configuration from the defined factory."); public static final ErrorCode JOB_IS_NOT_CONFIGURED = new ErrorCode(1004,"Job is not configured"); ! public static final ErrorCode JOB_CONTROLLER_CONFIGURATION_ERROR = new ErrorCode(1005,"Job controller configuration is not defined properly."); ! ! public static final ErrorCode BASIC_JOB_PROCESSOR_EXCEPTION = new ErrorCode(1006,"Controller caught exception while executing process method on basic job processor."); ! } --- 112,163 ---- } + /** + * Represents job got completed successfully. + */ + public static final ErrorCode JOB_COMPLETED_SUCCESSFULLY = new ErrorCode(0,"Job completed successfully."); + /** + * Represents job got completed with errors. This represents the partial success. + */ + public static final ErrorCode JOB_COMPLETED_WITH_ERRORS = new ErrorCode(1, "Job completed with some errors."); /** ! * Represents the configuration error used to invoke the job. ! */ ! public static final ErrorCode JOB_INVOKATION_CONFIGURAION_ERROR = new ErrorCode(1000,"Error while parsing the configuraion passed to invoke job."); ! /** ! * Represents the framework configuration error. */ public static final ErrorCode FRAMEWORK_CONFIGURATION_ERROR = new ErrorCode(1001,"Error while accessing or parsing the framework configuration file."); + /** + * Represents the job config factory configuration error. + */ public static final ErrorCode JOB_CONFIG_FACTORY_CONFIGURATION_ERROR = new ErrorCode(1002,"Job configuration factory cannot be created by the given factory configuration."); + /** + * Represents the job configuration error. + */ public static final ErrorCode JOB_CONFIGURATION_ERROR = new ErrorCode(1003,"Error while loading the job configuration from the defined factory."); + /** + * Represents the job is not configured error. + */ public static final ErrorCode JOB_IS_NOT_CONFIGURED = new ErrorCode(1004,"Job is not configured"); ! /** ! * Represents the job controller configuration error. ! */ ! public static final ErrorCode JOB_CONTROLLER_CONFIGURATION_ERROR = new ErrorCode(1005,"Job controller configuration is not defined properly."); ! /** ! * Represents error because of the exception in basic job processor. ! */ ! public static final ErrorCode BASIC_JOB_PROCESSOR_EXCEPTION = new ErrorCode(1102,"Basic Job Controller caught exception while executing process method on basic job processor."); ! /** ! * Represents error because of the exception in pool job loader. ! */ ! public static final ErrorCode POOL_JOB_LOADER_EXCEPTION = new ErrorCode(1201,"Exception while executing the loader to load the pool."); ! /** ! * Represents error because of the exception in pool job processor. ! */ ! public static final ErrorCode POOL_JOB_PROCESSOR_EXCEPTION = new ErrorCode(1202,"Exception while executing the processor to process the pool."); ! /** ! * Rpresents miscellaneous Error codes. ! */ ! public static final ErrorCode CUSTOM_ERROR_CODE = new ErrorCode(1301, "Custome Error = "); ! } |
From: Suresh <sur...@us...> - 2006-05-19 18:55:22
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv31715/org/jmonks/batchserver/framework/controller/basic Modified Files: BasicJobController.java BasicJobProcessor.java Log Message: no message Index: BasicJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobProcessor.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** BasicJobProcessor.java 14 May 2006 01:24:18 -0000 1.8 --- BasicJobProcessor.java 19 May 2006 18:55:17 -0000 1.9 *************** *** 126,159 **** /** * <p> ! * This method gets a chance to initialize the job processor. This will be called ! * before the <i>process</i> method being called. Properties configured for ! * job processor in job configuration will be passed as map to this method. ! * If job processor needs any resources, they can initialize them here. If ! * mulitple job processors have been configured, this method will be called ! * for each processor in the order they will be created. ! * </p> ! * ! * @param configProps Properties defined for job processor in the job configuration. ! */ ! public abstract void initialize(Map configProps); ! ! /** ! * <p> ! * This method let developer implement businses logic and return the appropriate ! * error code. * </p> * * @return Returns the appropriate error code needs to be passed to the invocation layer. */ ! public abstract ErrorCode process(); ! ! /** ! * <p> ! * This method gets a chance to do any cleanup if needed, after the ! * <i>process</i> method finished. This will be called irrespective of the ! * return status(even exceptions occured) of the <i>process</i> method. ! * </p> ! */ ! public abstract void cleanup(); /** --- 126,138 ---- /** * <p> ! * Does the processing and return the appropriate error code. Properties ! * configured for this job processor will be passed through the configProps. * </p> * + * @param configProps Properties defined for job processor in the job configuration. + * * @return Returns the appropriate error code needs to be passed to the invocation layer. */ ! public abstract ErrorCode process(Map configProps); /** Index: BasicJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobController.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** BasicJobController.java 14 May 2006 01:24:18 -0000 1.9 --- BasicJobController.java 19 May 2006 18:55:17 -0000 1.10 *************** *** 75,96 **** String threadID=super.getJobName()+"_"+(i+1); BasicJobProcessor jobProcessor=this.getBasicJobProcessor(basicJobControllerConfig.getBasicJobProcessorClassName()); - /** - * Make seperate copy of property map for each processor and initialize with that map. - */ - Map processorConfigProperties=new HashMap(basicJobControllerConfig.getBasicJobProcessorConfigProperties()); - try - { - logger.debug("Going to initialize the " + threadID + " basic job processor"); - jobProcessor.initialize(processorConfigProperties); - logger.info("Done initializing the " + threadID + " basic job processor"); - } - catch(Throwable exception) - { - exception.printStackTrace(); - logger.error("Exception while initializing the " + threadID + " basic job processor = " + - exception.getMessage(), exception); - } FutureResult result=new FutureResult(); ! Thread processorThread=new Thread(result.setter(this.getCallableProcessor(countDownLock,jobProcessor)), threadID); processorThread.start(); this.jobProcessorsMap.put(threadID,jobProcessor); --- 75,81 ---- String threadID=super.getJobName()+"_"+(i+1); BasicJobProcessor jobProcessor=this.getBasicJobProcessor(basicJobControllerConfig.getBasicJobProcessorClassName()); FutureResult result=new FutureResult(); ! Thread processorThread=new Thread(result.setter(this.getCallableProcessor ! (countDownLock,jobProcessor,basicJobControllerConfig.getBasicJobProcessorConfigProperties())), threadID); processorThread.start(); this.jobProcessorsMap.put(threadID,jobProcessor); *************** *** 303,307 **** * @return Returns the runnable instance. */ ! private Callable getCallableProcessor(final CountDown countDownLock,final BasicJobProcessor jobProcessor) { logger.trace("Entering getCallableProcessor"); --- 288,292 ---- * @return Returns the runnable instance. */ ! private Callable getCallableProcessor(final CountDown countDownLock,final BasicJobProcessor jobProcessor,final Map configProps) { logger.trace("Entering getCallableProcessor"); *************** *** 315,319 **** logger.debug("Status of registering thread with the processor = " + registered); logger.trace("Going to call the process method"); ! returnCode=jobProcessor.process(); logger.debug("Done calling the process method"); } --- 300,304 ---- logger.debug("Status of registering thread with the processor = " + registered); logger.trace("Going to call the process method"); ! returnCode=jobProcessor.process(new HashMap(configProps)); logger.debug("Done calling the process method"); } *************** *** 324,339 **** returnCode=ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } - // Need to call the processor cleanup all the time. - try - { - logger.trace("Going to call cleanup method"); - jobProcessor.cleanup(); - logger.debug("Done calling the cleanup method"); - } - catch(Throwable exception) - { - exception.printStackTrace(); - logger.error("Exception while doing the cleanup = " + exception.getMessage(), exception); - } countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); --- 309,312 ---- *************** *** 375,379 **** { ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(threadReturnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) { returnCode=threadReturnCode; --- 348,352 ---- { ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(!ErrorCode.JOB_COMPLETED_SUCCESSFULLY.equals(threadReturnCode)) { returnCode=threadReturnCode; |
From: Suresh <sur...@us...> - 2006-05-19 18:55:20
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv31715/org/jmonks/batchserver/framework Modified Files: Main.java Log Message: no message Index: Main.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/Main.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** Main.java 28 Mar 2006 04:53:33 -0000 1.7 --- Main.java 19 May 2006 18:55:17 -0000 1.8 *************** *** 5,12 **** --- 5,18 ---- import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.common.FrameworkUtil; + import org.jmonks.batchserver.framework.config.FrameworkConfig; + import org.jmonks.batchserver.framework.config.JobConfig; + import org.jmonks.batchserver.framework.config.JobConfigFactory; + import org.jmonks.batchserver.framework.controller.JobController; import org.jmonks.batchserver.framework.management.JobManagementAgent; + /** * <p> *************** *** 25,30 **** private static Logger logger=Logger.getLogger(Main.class); - private JobManagementAgent mMgmtMntrManager; - public Main() { --- 31,34 ---- *************** *** 55,61 **** { logger.trace("Entering process"); ! logger.trace("Exiting process"); ! return null; } --- 59,85 ---- { logger.trace("Entering process"); ! ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; ! try ! { ! String jobName=(String)configMap.get("job-name"); ! FrameworkConfig frameworkConfig=FrameworkConfig.getInstance(); ! LoggingManager.initializeFrameworkLogging(frameworkConfig.getFrameworkLoggingConfig()); ! JobConfigFactory jobConfigFactory=JobConfigFactory.getJobConfigFactory(frameworkConfig.getJobConfigFactoryConfig()); ! JobConfig jobConfig=jobConfigFactory.getJobConfig(jobName); ! LoggingManager.initializeJobLogging(jobName,frameworkConfig.getFrameworkLoggingConfig(), jobConfig.getJobLoggingConfig()); ! JobController jobController=JobController.getJobController(jobName, jobConfig.getJobControllerConfig()); ! JobManagementAgent jobManagementAgent=JobManagementAgent.getJobManagementAgent(); ! jobManagementAgent.start(jobName, jobController); ! returnCode=jobController.process(); ! jobManagementAgent.stop(returnCode); ! } ! catch(Throwable exception) ! { ! exception.printStackTrace(); ! logger.fatal("Exception while processing the job = " + exception.getMessage(),exception); ! returnCode=ErrorCode.JOB_COMPLETED_WITH_ERRORS.appendMessage(exception.getMessage()); ! } logger.trace("Exiting process"); ! return returnCode; } *************** *** 78,91 **** public static void main(String args[]) { ! Map configMap=new HashMap(); ! /** ! * parse command line parameters and populate the map. ! **/ Main main=new Main(); ErrorCode errorCode=main.process(configMap); System.exit(errorCode.getCode()); } - - } --- 102,127 ---- public static void main(String args[]) { ! logger.info("Job started"); Map configMap=new HashMap(); ! try ! { ! StringBuffer commandLineConfiguration=new StringBuffer(); ! for(int i=0;i<args.length;i++) ! { ! commandLineConfiguration.append(args[i]+":"); ! } ! FrameworkUtil.loadPropertiesFromStringToMap(commandLineConfiguration.toString(),configMap); ! } ! catch(IllegalArgumentException exception) ! { ! exception.printStackTrace(); ! logger.fatal("Exception while parsing the command line parameters = " + exception.getMessage(),exception); ! System.exit(ErrorCode.JOB_INVOKATION_CONFIGURAION_ERROR.getCode()); ! } ! Main main=new Main(); ErrorCode errorCode=main.process(configMap); + logger.info("Job finished = " + errorCode); System.exit(errorCode.getCode()); } } |
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv24956 Modified Files: AbstractPoolJobLoader.java AbstractPoolJobProcessor.java PoolJobController.java PoolJobLoader.java PoolJobProcessor.java Log Message: no message Index: PoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobProcessor.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** PoolJobProcessor.java 16 May 2006 03:15:28 -0000 1.4 --- PoolJobProcessor.java 17 May 2006 22:04:36 -0000 1.5 *************** *** 6,11 **** /** * PoolJobProcessor gets the job data to be processed from the job pool and ! * processes it. * * @author Suresh pragada --- 6,14 ---- /** + * <p> * PoolJobProcessor gets the job data to be processed from the job pool and ! * processes it. Along with the processing methods, it exposes some method used ! * by management and monitoring clients. ! * </p> * * @author Suresh pragada *************** *** 16,25 **** { /** ! * Chance to initialize itself using the information configured for this job ! * processor in job configuration. * * @param configProps Configuration defined for the job processor in job configuration. */ ! public void initialize(Map configProps); /** * Suspends the pool job processor. --- 19,34 ---- { /** ! * <p> ! * Process the job data getting from the job pool and quits when job pool ! * out of the job data(returns null). ! * </p> * * @param configProps Configuration defined for the job processor in job configuration. + * @param pool Reference to Job Pool. + * + * @return Returns the error code. */ ! public ErrorCode processPool(Map configProps, JobPool pool); ! /** * Suspends the pool job processor. *************** *** 41,57 **** public boolean stop(); /** - * Chance to do any cleanup at the end of the processing. - */ - public void cleanup(); - /** - * Process the job data getting from the job pool and quits when job pool - * out of the job data(returns null). - * - * @param pool Reference to Job Pool. - * - * @return Returns the error code. - */ - public ErrorCode process(JobPool pool); - /** * Gets the processor to be displyed or anaylyzed for the monitoring purposes. * --- 50,53 ---- Index: AbstractPoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobLoader.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** AbstractPoolJobLoader.java 17 May 2006 03:05:58 -0000 1.6 --- AbstractPoolJobLoader.java 17 May 2006 22:04:36 -0000 1.7 *************** *** 11,15 **** --- 11,17 ---- package org.jmonks.batchserver.framework.controller.pool; + import EDU.oswego.cs.dl.util.concurrent.Mutex; import java.util.Map; + import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 31,35 **** * Holds the status of the loader. */ ! private ProcessorStatus loaderStatus=ProcessorStatus.INSTANTIATED; /** * Holds the pool reference passed by controller. --- 33,37 ---- * Holds the status of the loader. */ ! protected ProcessorStatus loaderStatus=ProcessorStatus.INSTANTIATED; /** * Holds the pool reference passed by controller. *************** *** 41,90 **** private Object currentJobData=null; /** ! * Returns the processing state of the loader. ! * ! * @return Returns the job data that this loader is loading. */ ! public Object getLoaderState() ! { ! return currentJobData.toString(); ! } ! /** ! * Returns the status of the loader. ! * ! * @return Returns the status of the loader. */ ! public ProcessorStatus getLoaderStatus() ! { ! return this.loaderStatus; ! } /** ! * Abstracts the loading of the job data into the pool by defining ! * another set of methods to load into the pool and implements the ! * management and monitoring related methods. * * @param configProps Properties defined for this loader in job configuration. * @param pool Job Pool reference. * ! * @return Returns the final error code of loading the jobs. */ public final ErrorCode loadPool(Map configProps,JobPool pool) { this.pool=pool; ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { returnCode=this.loadPool(configProps); } catch(Throwable exception) { exception.printStackTrace(); ! returnCode=ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } return returnCode; } /** ! * Loads the given job data into the job pool. * * @param jobData Job data object that needs to be processed. --- 43,94 ---- private Object currentJobData=null; /** ! * Signal the loader to the stop the loading. */ ! private boolean stopSignal=false; /** ! * Signal the loader to suspend the loading. */ ! private boolean suspendSignal=false; /** ! * Mutex lock to suspend and resume the loader. ! */ ! private Mutex suspendLock=new Mutex(); ! ! private static Logger logger=Logger.getLogger(AbstractPoolJobLoader.class); ! ! /** ! * Abstracts the job pool details from the final loader by defining ! * other set of methods for the final loader and implements the management ! * and monitoring related methods. * * @param configProps Properties defined for this loader in job configuration. * @param pool Job Pool reference. * ! * @return Returns the final status of loading. */ public final ErrorCode loadPool(Map configProps,JobPool pool) { + logger.trace("Entering loadPool"); this.pool=pool; ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { + this.loaderStatus=ProcessorStatus.RUNNING; returnCode=this.loadPool(configProps); + this.loaderStatus=ProcessorStatus.FINISHED; } catch(Throwable exception) { exception.printStackTrace(); ! logger.error("Exception while calling loaders loadPool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; } + logger.trace("Exiting loadPool"); return returnCode; } /** ! * Loads the given job data into the job pool.This method is for the ! * final loaders to load the job data into the pool. * * @param jobData Job data object that needs to be processed. *************** *** 95,122 **** { boolean loaded=false; if(jobData!=null) this.currentJobData=jobData; loaded=this.pool.loadJobData(jobData); return loaded; } ! ! public boolean resume() { return true; } ! public boolean stop() { return true; } ! public boolean suspend() { return true; } ! /** * */ public abstract ErrorCode loadPool(Map configProps); --- 99,237 ---- { boolean loaded=false; + if(jobData!=null) this.currentJobData=jobData; + else + logger.debug("Loading the null to signal the end of the pool for the processors(s)"); + loaded=this.pool.loadJobData(jobData); + + if(this.suspendSignal) + { + try + { + this.loaderStatus=ProcessorStatus.SUSPENDED; + this.suspendLock.acquire(); + this.loaderStatus=ProcessorStatus.RUNNING; + } + catch(InterruptedException exception) + { + exception.printStackTrace(); + logger.error("Exception while suspending the loader " + exception.getMessage(),exception); + this.loaderStatus=ProcessorStatus.RUNNING; + } + } + return loaded; } ! /** ! * Tells whether the loading of the jobs needs to be stopped or not. ! * Loader implementation should check for this flag before loading the jobs ! * into the pool. ! * ! * @return Returns true if loader to be stopped, false continue loading the jobs. ! */ ! protected final boolean stopLoading() ! { ! return this.stopSignal; ! } ! /** ! * Resumes the loading of the jobs. ! * ! * @return Returns true if loader is resumed, false otherwise. ! */ public boolean resume() { + logger.trace("Entering resume"); + this.stopSignal=false; + this.suspendSignal=false; + this.suspendLock.release(); + logger.trace("Exiting resume"); return true; } ! /** ! * Stops the loading of the jobs into the pool. ! * ! * @return Returns true if loader is stopped, false otherwise. ! */ public boolean stop() { + logger.trace("Entering stop"); + this.stopSignal=true; + + if(this.suspendSignal) + { + logger.info("Loader is in suspend status... resuming the loader"); + this.suspendSignal=false; + this.suspendLock.release(); + } + logger.trace("Exiting stop"); return true; } ! ! /** ! * Suspends loading of the jobs into the pool. ! */ public boolean suspend() { + logger.trace("Entering suspend"); + + this.suspendSignal=true; + + logger.trace("Exiting suspend"); return true; } ! ! /** ! * Returns the status of the loader. ! * ! * @return Returns the status of the loader. ! */ ! public ProcessorStatus getLoaderStatus() ! { ! return this.loaderStatus; ! } ! /** + * Returns the processing state of the loader. + * + * @return Returns the job data that this loader is loading. + */ + public Object getLoaderState() + { + return currentJobData.toString(); + } + + /** + * <p> + * Load the job data into the pool that needs to be processed by job processor(s). + * Implementers can take the help of the <i>loadJobData(Object)</i> method defined + * here to load the jobs into the pool. + * <br> + * <pre> + * public class MyLoader + * { + * public ErrorCode loadPool(Map configProps) + * { + * for(int i=0;i<100;i++) + * { + * loadJobData(new Integer(i)); + * if(super.stopLoading()) + * { + * doCleanup(); + * super.loaderStatus=ProcessorStatus.STOPPED; + * break; + * } + * } + * loadJobData(null); + * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + * } + * } + * </pre> + * </p> * + * @param configProps Configuration defined for this job loader in job configuration. + * + * @return Returns the final status of the loader. */ public abstract ErrorCode loadPool(Map configProps); Index: PoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobLoader.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** PoolJobLoader.java 17 May 2006 03:05:58 -0000 1.5 --- PoolJobLoader.java 17 May 2006 22:04:36 -0000 1.6 *************** *** 5,11 **** /** * PoolJobLoader loads the job data into the pool to be processed by ! * PoolJobProcessor(s). ! * * @author Suresh Pragada * @version 1.0 --- 5,15 ---- /** + * <p> * PoolJobLoader loads the job data into the pool to be processed by ! * PoolJobProcessor(s). Along with the methods to load job data into the ! * pool, it exposes the some other methods used by the management and ! * monitoring clients. ! * </p> ! * * @author Suresh Pragada * @version 1.0 *************** *** 16,24 **** /** * <p> ! * Loads the job data that needs to be processed in to the given job pool. * When finished loading of all the job data, loads the <i>null</i> into the pool * to singal the processor(s) that loading of all the jobs have been done. ! * Configuration defined for this loader in job configuration will be available ! * in configProps map. * <br> * Example loading the 100 integer objects into the pool. --- 20,28 ---- /** * <p> ! * Loads the job data that needs to be processed in to the job pool. * When finished loading of all the job data, loads the <i>null</i> into the pool * to singal the processor(s) that loading of all the jobs have been done. ! * Configuration defined for this loader in job configuration will be accessed by ! * configProps map received from controller. * <br> * Example loading the 100 integer objects into the pool. *************** *** 38,41 **** --- 42,47 ---- * @param configProps Properties defined for this job loader in job configuration. * @param pool Job Pool reference. + * + * @return Retrurns the final status of the loader. */ public ErrorCode loadPool(Map configProps,JobPool pool); Index: AbstractPoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobProcessor.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** AbstractPoolJobProcessor.java 16 May 2006 03:15:28 -0000 1.3 --- AbstractPoolJobProcessor.java 17 May 2006 22:04:36 -0000 1.4 *************** *** 14,17 **** --- 14,18 ---- * to the final processor. * </p> + * * @author Suresh Pragada * @version 1.0 *************** *** 24,79 **** */ private int processedJobDataCount=0; - - private boolean stopSignal=false; - - private boolean suspendSignal=false; - - private Mutex suspendLock=new Mutex(); - - private static Logger logger=Logger.getLogger(AbstractPoolJobProcessor.class); - /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#initialize(java.util.Map) ! */ ! public abstract void initialize(Map configProps); ! /** ! * */ ! public abstract ErrorCode process(Object jobData); /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#cleanup() */ ! public abstract void cleanup(); ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#suspend() */ ! public boolean suspend() ! { ! this.suspendSignal=true; ! return this.suspendSignal; ! } /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#resume() */ ! public boolean resume() ! { ! this.stopSignal=false; ! this.suspendSignal=false; ! this.suspendLock.release(); ! return true; ! } /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#stop() */ ! public boolean stop() ! { ! return true; ! } /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#suspend() */ ! public ErrorCode processPool(JobPool pool) { Object jobData=null; while((jobData=pool.getNextJobData())!=null) --- 25,86 ---- */ private int processedJobDataCount=0; /** ! * Signal to stop the processor. */ ! private boolean stopSignal=false; /** ! * Singal to suspend the processor. */ ! private boolean suspendSignal=false; /** ! * Mutex lock to suspend the processor. */ ! private Mutex suspendLock=new Mutex(); /** ! * Holds the processor status. */ ! protected ProcessorStatus processorStatus=ProcessorStatus.INSTANTIATED; /** ! * Holds the job data currently being processed. */ ! private Object processingJobData=null; ! ! private static Logger logger=Logger.getLogger(AbstractPoolJobProcessor.class); ! /** ! * <p> ! * Initializes the processor implementation by calling the <i>initialize</i> ! * method by passing map contains the properties defined for the processor in ! * job configuration, gets the job data from the pool and passes that information ! * to the processor implementation for processing and cleansup the processor ! * implementation by calling the <i>cleanup</i>. ! * </p> ! * ! * @param configProps Properties defined for the processor in job configuration. ! * @param pool Job pool reference where job data needs to be pulled. ! * ! * @return Returns the status code of this processor. */ ! public ErrorCode processPool(Map configProps, JobPool pool) { + logger.trace("Entering processPool"); + ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + /** + * Calling the initializer on final processor implementation. + */ + try + { + this.processorStatus=ProcessorStatus.INITIALIZING; + initialize(configProps); + this.processorStatus=ProcessorStatus.RUNNING; + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error("Exception while calling initialize on processor " + exception.getMessage(), exception); + } + /** + * Get the job data from pool and pass that on to the final processor implementation. + */ Object jobData=null; while((jobData=pool.getNextJobData())!=null) *************** *** 81,111 **** try { this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); } catch(Throwable exception) { exception.printStackTrace(); } if(this.stopSignal) { logger.info("Received the stop signal.. Preparing to stop"); break; ! } if(this.suspendSignal) { try { this.suspendLock.acquire(); } catch(InterruptedException exception) { exception.printStackTrace(); ! logger.error("Got exception while suspending..." + exception.getMessage(), exception); } ! } } return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } /** * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessorState() --- 88,189 ---- try { + this.processingJobData=jobData; this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); + returnCode=errorCode; } catch(Throwable exception) { exception.printStackTrace(); + logger.error("Exception while processing the job data " + exception.getMessage(),exception); + returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } + if(this.stopSignal) { logger.info("Received the stop signal.. Preparing to stop"); break; ! } ! if(this.suspendSignal) { try { + logger.info("Received suspend signal... suspending the process"); + this.processorStatus=ProcessorStatus.SUSPENDED; this.suspendLock.acquire(); + this.processorStatus=ProcessorStatus.RUNNING; + logger.info("Resuming the process"); } catch(InterruptedException exception) { exception.printStackTrace(); ! logger.error("Got exception while suspending..." + exception.getMessage() + " process is going to be resumed", exception); } ! } } + + try + { + this.processorStatus=ProcessorStatus.CLEANUP; + cleanup(); + this.processorStatus=ProcessorStatus.FINISHED; + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error("Exception while calling cleanup on processor " + exception.getMessage(), exception); + } + + logger.trace("Exiting processPool"); return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } + + /** + * Suspends the processor. + * + * @return Returns true if the processor is suspended, false otherwise. + */ + public boolean suspend() + { + logger.trace("Entering suspend"); + this.suspendSignal=true; + logger.trace("Entering suspend"); + return true; + } + /** + * Resumes the processor. + * + * @return Retrurns true if the processor is resumed, false otherwise. + */ + public boolean resume() + { + logger.trace("Entering resume"); + this.stopSignal=false; + this.suspendSignal=false; + this.suspendLock.release(); + logger.trace("Exiting resume"); + return true; + } + /** + * Stops the processor. + * + * @return Retrurns true if the processor has stopped, false otherwise. + */ + public boolean stop() + { + logger.trace("Entering stop"); + + this.stopSignal=true; + if(this.suspendSignal) + { + logger.info("Processor is in suspend status... resuming the processor"); + this.suspendSignal=false; + this.suspendLock.release(); + } + + logger.trace("Exiting stop"); + return true; + } /** * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessorState() *************** *** 113,117 **** public Object getProcessorState() { ! return null; } /** --- 191,195 ---- public Object getProcessorState() { ! return this.processingJobData; } /** *************** *** 120,124 **** public ProcessorStatus getProcessorStatus() { ! return null; } /** --- 198,202 ---- public ProcessorStatus getProcessorStatus() { ! return this.processorStatus; } /** *************** *** 129,131 **** --- 207,227 ---- return processedJobDataCount; } + + /** + * Chance to initialize itself using the information configured for this job + * processor in job configuration. + * + * @param configProps Configuration defined for the job processor in job configuration. + */ + public abstract void initialize(Map configProps); + + /** + * + */ + public abstract ErrorCode process(Object jobData); + + /** + * Chance to do any cleanup at the end of the processing. + */ + public abstract void cleanup(); } Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** PoolJobController.java 14 May 2006 01:31:32 -0000 1.7 --- PoolJobController.java 17 May 2006 22:04:36 -0000 1.8 *************** *** 1,4 **** --- 1,13 ---- package org.jmonks.batchserver.framework.controller.pool; + import EDU.oswego.cs.dl.util.concurrent.Callable; + import EDU.oswego.cs.dl.util.concurrent.CountDown; + import EDU.oswego.cs.dl.util.concurrent.FutureResult; + import java.util.Hashtable; + import java.util.Iterator; + import java.util.Map; + import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.config.ConfigurationException; + import org.jmonks.batchserver.framework.config.PoolJobControllerConfig; import org.jmonks.batchserver.framework.controller.JobController; import org.jmonks.batchserver.framework.management.ProcessorState; *************** *** 56,59 **** --- 65,81 ---- { /** + * Map holds all the pool job processors and loader being used for the given job + * as values and name of the threads as keys. + */ + private Map jobProcessorsMap=new Hashtable(); + /** + * Map holds all the pool job processors and loader returned error codes as values and + * name of the threads as keys. + */ + private Map jobProcessorsResultMap=new Hashtable(); + + private static Logger logger=Logger.getLogger(PoolJobController.class); + + /** * Constructor enables the instantiation of the pool job controller instance. */ *************** *** 72,77 **** public ErrorCode process() { ! return null; } /** --- 94,143 ---- public ErrorCode process() { ! logger.info("Entering process in pool job controller = " + super.getJobName()); ! PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)super.getJobControllerConfig(); ! validateControllerConfiguration(poolJobControllerConfig); ! int processorCount=poolJobControllerConfig.getPoolJobProcessorThreadCount(); ! processorCount=(processorCount<1?1:processorCount); ! /** ! * Create and initialize the pool ! */ ! JobPool pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); ! pool.initialize(poolJobControllerConfig.getPoolConfigProperties()); ! ! CountDown countDownLock=new CountDown(processorCount+1); ! /** ! * Create, initialize and spwan the loader ! */ ! PoolJobLoader jobLoader=(PoolJobLoader)this.getInstance(poolJobControllerConfig.getPoolJobLoaderClassName()); ! String jobLoaderName=super.getJobName()+"_Loader"; ! FutureResult jobLoaderFutureResult=new FutureResult(); ! Thread jobLoaderThread=new Thread(jobLoaderFutureResult.setter(this.getCallableLoader( ! countDownLock,jobLoader,poolJobControllerConfig.getPoolJobLoaderConfigProperties(),pool)),jobLoaderName); ! jobLoaderThread.start(); ! this.jobProcessorsResultMap.put(jobLoaderName, jobLoaderFutureResult); ! this.jobProcessorsMap.put(jobLoaderName, jobLoader); ! /** ! * Create, initialize and spawn the processor(s). ! */ ! for(int i=0;i<processorCount;i++) ! { ! PoolJobProcessor jobProcessor=(PoolJobProcessor)this.getInstance(poolJobControllerConfig.getPoolJobProcessorClassName()); ! String jobProcessorName=super.getJobName()+"_Processor_"+i; ! FutureResult jobProcessorFutureResult=new FutureResult(); ! Thread jobProcessorThread=new Thread(jobProcessorFutureResult.setter(this.getCallableProcessor( ! countDownLock,jobProcessor,poolJobControllerConfig.getPoolJobProcessorConfigProperties(),pool)),jobProcessorName); ! jobProcessorThread.start(); ! this.jobProcessorsResultMap.put(jobProcessorName, jobProcessorFutureResult); ! this.jobProcessorsMap.put(jobProcessorName, jobProcessor); ! } ! /** ! * Go to hydernate until all threads have finsihed their task. ! */ ! ErrorCode returnCode=hybernate(countDownLock); ! logger.info("Exiting process in pool job controller = " + super.getJobName() + " with return code = " + returnCode); ! return returnCode; } + + /** *************** *** 83,87 **** public int getExpectedRecordsCount() { ! return 0; } --- 149,161 ---- public int getExpectedRecordsCount() { ! logger.trace("Entering getExpectedRecordsCount"); ! ! int expectedRecordsCount=0; ! String jobLoaderName=super.getJobName()+"_Loader"; ! PoolJobLoader jobLoader=(PoolJobLoader)this.jobProcessorsMap.get(jobLoaderName); ! expectedRecordsCount=jobLoader.getTotalJobDataCount(); ! ! logger.trace("Exiting getExpectedRecordsCount = " + expectedRecordsCount); ! return expectedRecordsCount; } *************** *** 94,98 **** public int getProcessedRecordsCount() { ! return 0; } --- 168,183 ---- public int getProcessedRecordsCount() { ! logger.trace("Entering getProcessedRecordsCount"); ! ! int processedRecordsCount=0; ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! Object processor=iterator.next(); ! if(processor instanceof PoolJobProcessor) ! processedRecordsCount=processedRecordsCount+((PoolJobProcessor)processor).getProcessedJobDataCount(); ! } ! ! logger.trace("Exiting getProcessedRecordsCount = " + processedRecordsCount); ! return processedRecordsCount; } *************** *** 104,108 **** public java.lang.String[] getProcessorIDList() { ! return null; } --- 189,201 ---- public java.lang.String[] getProcessorIDList() { ! logger.trace("Entering getProcessorIDList"); ! ! String processorIDList[]=new String[this.jobProcessorsMap.size()]; ! int i=0; ! for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) ! processorIDList[i]=(String)iterator.next(); ! ! logger.trace("Exiting getProcessorIDList"); ! return processorIDList; } *************** *** 117,121 **** public ProcessorState getProcessorState(String processorID) { ! return null; } --- 210,233 ---- public ProcessorState getProcessorState(String processorID) { ! logger.trace("Exiting getProcessorState = " + processorID); ! ProcessorState state=null; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! state=new ProcessorState(processorID, "Pool Job Loader", ((PoolJobLoader)processor).getLoaderState()); ! else if(processor instanceof PoolJobProcessor) ! state=new ProcessorState(processorID, "Pool Job Processor", ((PoolJobProcessor)processor).getProcessorState()); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! state=null; ! } ! } ! else ! state=null; ! ! logger.trace("Exiting getProcessorState = " + processorID + " state = " + state); ! return state; } *************** *** 127,131 **** public ProcessorStatus getProcessorStatus(String processorID) { ! return null; } --- 239,262 ---- public ProcessorStatus getProcessorStatus(String processorID) { ! logger.trace("Exiting getProcessorStatus = " + processorID); ! ProcessorStatus status=null; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! status=((PoolJobLoader)processor).getLoaderStatus(); ! else if(processor instanceof PoolJobProcessor) ! status=((PoolJobProcessor)processor).getProcessorStatus(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! status=null; ! } ! } ! else ! status=null; ! ! logger.trace("Exiting getProcessorStatus = " + processorID + " status = " + status); ! return status; } *************** *** 137,141 **** public boolean stop(String processorID) { ! return true; } --- 268,290 ---- public boolean stop(String processorID) { ! logger.trace("Entering stop = " + processorID); ! boolean stopped=true; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! stopped=((PoolJobLoader)processor).stop(); ! else if(processor instanceof PoolJobProcessor) ! stopped=((PoolJobProcessor)processor).stop(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! stopped=false; ! } ! } ! else ! stopped=false; ! logger.trace("Exiting stop = " + processorID + " status = " + stopped); ! return stopped; } *************** *** 147,151 **** public boolean suspend(String processorID) { ! return true; } --- 296,318 ---- public boolean suspend(String processorID) { ! logger.trace("Entering suspend = " + processorID); ! boolean suspended=true; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! suspended=((PoolJobLoader)processor).suspend(); ! else if(processor instanceof PoolJobProcessor) ! suspended=((PoolJobProcessor)processor).suspend(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! suspended=false; ! } ! } ! else ! suspended=false; ! logger.trace("Exiting suspend = " + processorID + " status = " + suspended); ! return suspended; } *************** *** 157,161 **** public boolean resume(String processorID) { ! return true; } } --- 324,534 ---- public boolean resume(String processorID) { ! logger.trace("Entering resume = " + processorID); ! boolean resumed=true; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! resumed=((PoolJobLoader)processor).resume(); ! else if(processor instanceof PoolJobProcessor) ! resumed=((PoolJobProcessor)processor).resume(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! resumed=false; ! } ! } ! else ! resumed=false; ! logger.trace("Exiting resume = " + processorID + " status = " + resumed); ! return resumed; ! } ! ! /** ! * <p> ! * Main thread will call this method and wait until all the processor(s) and loader have ! * finished their processing. Once they are finished, all the return codes ! * will be analyzed and one error code will be returned for this job. ! * </p ! * @param countDownLock CountDown lock being used by loader and all the processor(s). ! * ! * @return Returns the error code of the job. ! */ ! private ErrorCode hybernate(CountDown countDownLock) ! { ! logger.trace("Entering hybernate"); ! ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; ! try ! { ! logger.debug("Going to wait until loader and all the processor(s) is gonna finish."); ! countDownLock.acquire(); ! logger.info("Loader and all processor(s) have finished their task."); ! } ! catch(InterruptedException exception) ! { ! exception.printStackTrace(); ! logger.error("Exception while waiting for loader and all the processor(s) = " + exception.getMessage(), exception); ! return ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; ! } ! ! for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) ! { ! ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(threadReturnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) ! { ! returnCode=threadReturnCode; ! break; ! } ! } ! logger.trace("Exiting hybernate = " + returnCode); ! return returnCode; ! } ! ! /** ! * Wraps the job loader with Callable interface and return the callable ! * interface. ! * ! * @param countDownLock Count down to be released at the end of the processing. ! * @param jobLoader Job Loader that needs to be invoked. ! * @param configProps Properties defined for this loader in job configuration. ! * @param pool Job pool where the loader needs to load the jobs. ! * ! * @return Returns the callabel interface wrapped around the job loader. ! */ ! private Callable getCallableLoader(final CountDown countDownLock,final PoolJobLoader jobLoader, final Map configProps, final JobPool pool) ! { ! logger.trace("Entering getCallableLoader"); ! Callable callable=new Callable(){ ! public Object call() ! { ! ErrorCode returnCode=null; ! try ! { ! logger.trace("Going to call the loadPool method"); ! returnCode=jobLoader.loadPool(configProps,pool); ! logger.debug("Done calling the loadPool method"); ! } ! catch(Throwable exception) ! { ! exception.printStackTrace(); ! logger.error("Exception while loading the job data into the pool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; ! } ! countDownLock.release(); ! logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); ! return returnCode; ! } ! }; ! logger.trace("Exiting getCallableLoader"); ! return callable; } + + /** + * Wraps the job processor with Callable interface and return the callable + * interface. + * + * @param countDownLock Count down to be released at the end of the processing. + * @param jobProcessor Job processor that needs to be invoked. + * @param configProps Properties defined for this loader in job configuration. + * @param pool Job pool where the processor needs to pull the jobs. + * + * @return Returns the callabel interface wrapped around the job processor. + */ + private Callable getCallableProcessor(final CountDown countDownLock,final PoolJobProcessor jobProcessor, final Map configProps, final JobPool pool) + { + logger.trace("Entering getCallableProcessor"); + Callable callable=new Callable(){ + public Object call() + { + ErrorCode returnCode=null; + try + { + logger.trace("Going to call the processPool method"); + returnCode=jobProcessor.processPool(configProps,pool); + logger.debug("Done calling the processPool method"); + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error("Exception while processing the job data from the pool = " + exception.getMessage(), exception); + returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; + } + countDownLock.release(); + logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); + return returnCode; + } + }; + logger.trace("Exiting getCallableProcessor"); + return callable; + } + + /** + * Instantiates and returns the instance of the required class. + * + * @param className Class name of the required instance. + * + * @return Retrurns the instance of the required class. + * + * @throws ConfigurationException If class could not be instantiated. + */ + private Object getInstance(String className) + { + try + { + return Class.forName(className).newInstance(); + } + catch(Exception exception) + { + exception.printStackTrace(); + logger.fatal(exception.getMessage(),exception); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); + } + } + + /** + * Validates the pool job controller configuration by instantiating and verifying + * all the required loader, processor and pool classes. + * + * @param controllerConfig Controller configuration defined in job configuration. + * + * @throws ConfigurationException If any one required instances cannot be instantiated. + */ + private void validateControllerConfiguration(PoolJobControllerConfig controllerConfig) + { + Object jobPool=this.getInstance(controllerConfig.getPoolClassName()); + if(jobPool instanceof JobPool) + { + logger.debug("Job pool is configured properly = " + controllerConfig.getPoolClassName()); + } + else + { + logger.fatal("Configured job pool class name " + controllerConfig.getPoolClassName() + " cannot be associated to JobPool"); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, + "Configured job pool class name " + controllerConfig.getPoolClassName() + " cannot be associated to JobPool"); + } + + Object jobLoader=this.getInstance(controllerConfig.getPoolJobLoaderClassName()); + if(jobLoader instanceof PoolJobLoader) + { + logger.debug("Job loader is configured properly = " + controllerConfig.getPoolJobLoaderClassName()); + } + else + { + logger.fatal("Configured job loader class name " + controllerConfig.getPoolJobLoaderClassName() + " cannot be associated to PoolJobLoader"); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, + "Configured job loader class name " + controllerConfig.getPoolJobLoaderClassName() + " cannot be associated to PoolJobLoader"); + } + + Object jobProcessor=this.getInstance(controllerConfig.getPoolJobProcessorClassName()); + if(jobProcessor instanceof PoolJobLoader) + { + logger.debug("Job Processor is configured properly = " + controllerConfig.getPoolJobProcessorClassName()); + } + else + { + logger.fatal("Configured job processor class name " + controllerConfig.getPoolJobProcessorClassName() + " cannot be associated to PoolJobProcessor"); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, + "Configured job processor class name " + controllerConfig.getPoolJobProcessorClassName() + " cannot be associated to PoolJobProcessor"); + } + } } |
From: Suresh <sur...@us...> - 2006-05-17 03:06:03
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv10093 Modified Files: AbstractPoolJobLoader.java PoolJobLoader.java Log Message: no message Index: AbstractPoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobLoader.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** AbstractPoolJobLoader.java 16 May 2006 22:10:50 -0000 1.5 --- AbstractPoolJobLoader.java 17 May 2006 03:05:58 -0000 1.6 *************** *** 12,15 **** --- 12,16 ---- import java.util.Map; + import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 30,35 **** * Holds the status of the loader. */ ! protected ProcessorStatus loaderStatus=ProcessorStatus.INITIALIZING; ! /** * Returns the processing state of the loader. --- 31,43 ---- * Holds the status of the loader. */ ! private ProcessorStatus loaderStatus=ProcessorStatus.INSTANTIATED; ! /** ! * Holds the pool reference passed by controller. ! */ ! private JobPool pool=null; ! /** ! * Holds the current job data that will be used for the monitoring purposes. ! */ ! private Object currentJobData=null; /** * Returns the processing state of the loader. *************** *** 39,43 **** public Object getLoaderState() { ! return null; } --- 47,51 ---- public Object getLoaderState() { ! return currentJobData.toString(); } *************** *** 51,63 **** return this.loaderStatus; } ! ! public ErrorCode loadPool(JobPool pool) { ! return null; } ! public abstract ! public boolean resume() { --- 59,105 ---- return this.loaderStatus; } + /** + * Abstracts the loading of the job data into the pool by defining + * another set of methods to load into the pool and implements the + * management and monitoring related methods. + * + * @param configProps Properties defined for this loader in job configuration. + * @param pool Job Pool reference. + * + * @return Returns the final error code of loading the jobs. + */ + public final ErrorCode loadPool(Map configProps,JobPool pool) + { + this.pool=pool; + ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + try + { + returnCode=this.loadPool(configProps); + } + catch(Throwable exception) + { + exception.printStackTrace(); + returnCode=ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; + } + return returnCode; + } ! /** ! * Loads the given job data into the job pool. ! * ! * @param jobData Job data object that needs to be processed. ! * ! * @return Returns true if the job data is loaded into the pool, false otherwise. ! */ ! protected final boolean loadJobData(Object jobData) { ! boolean loaded=false; ! if(jobData!=null) ! this.currentJobData=jobData; ! loaded=this.pool.loadJobData(jobData); ! return loaded; } ! public boolean resume() { *************** *** 76,91 **** /** - * Chance to grab the properties defined for this job loader in job - * configuration and do some initialization. * - * @param configProps Properties defined in job configuration as a Map. */ ! public abstract void initialize(Map configProps); ! ! /** ! * Chance to do some cleanup for the Loader implementation. ! */ ! public abstract void cleanup(); ! /** * Returns the number of job data objects that this loader is going to load. --- 118,124 ---- /** * */ ! public abstract ErrorCode loadPool(Map configProps); /** * Returns the number of job data objects that this loader is going to load. Index: PoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobLoader.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** PoolJobLoader.java 16 May 2006 03:15:28 -0000 1.4 --- PoolJobLoader.java 17 May 2006 03:05:58 -0000 1.5 *************** *** 15,33 **** { /** - * Chance for the loader to initialize itself with the properties defined - * in the job configuration. - * - * @param configProps Configuration defined for the loader. - */ - public void initialize(Map configProps); - /** - * Chance to do some cleanup before the job loader being shutdown. - */ - public void cleanup(); - /** * <p> ! * Loads the job data to the given job pool. When finished loading of all the job ! * data, loads the <i>null</i> into the pool to singal the processor(s) that ! * loading of all the jobs have been done. * <br> * Example loading the 100 integer objects into the pool. --- 15,24 ---- { /** * <p> ! * Loads the job data that needs to be processed in to the given job pool. ! * When finished loading of all the job data, loads the <i>null</i> into the pool ! * to singal the processor(s) that loading of all the jobs have been done. ! * Configuration defined for this loader in job configuration will be available ! * in configProps map. * <br> * Example loading the 100 integer objects into the pool. *************** *** 35,39 **** * public class MyLoader * { ! * public ErrorCode loadPool(JobPool pool) * { * for(int i=0;i<100;i++ --- 26,30 ---- * public class MyLoader * { ! * public ErrorCode loadPool(Map configProps, JobPool pool) * { * for(int i=0;i<100;i++ *************** *** 45,51 **** * </p> * * @param pool Job Pool reference. */ ! public ErrorCode loadPool(JobPool pool); /** * Suspends the loader. --- 36,43 ---- * </p> * + * @param configProps Properties defined for this job loader in job configuration. * @param pool Job Pool reference. */ ! public ErrorCode loadPool(Map configProps,JobPool pool); /** * Suspends the loader. |
From: Suresh <sur...@us...> - 2006-05-16 22:10:55
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv29696 Added Files: AbstractPoolJobLoader.java Log Message: no message --- NEW FILE: AbstractPoolJobLoader.java --- /* * AbstractPoolJobLoader.java * * Created on May 16, 2006, 4:49 PM * * To change this template, choose Tools | Options and locate the template under * the Source Creation and Management node. Right-click the template and choose * Open. You can then make changes to the template in the Source Editor. */ package org.jmonks.batchserver.framework.controller.pool; import java.util.Map; import org.jmonks.batchserver.framework.management.ProcessorStatus; /** * <p> * AbstractPoolJobLoader implements all the management and monitoring methods * and abstracts the user from the job pool. This allows the loader implementation * to concentrate on the business logic. * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public abstract class AbstractPoolJobLoader implements PoolJobLoader { /** * Holds the status of the loader. */ protected ProcessorStatus loaderStatus=ProcessorStatus.INITIALIZING; /** * Returns the processing state of the loader. * * @return Returns the job data that this loader is loading. */ public Object getLoaderState() { return null; } /** * Returns the status of the loader. * * @return Returns the status of the loader. */ public ProcessorStatus getLoaderStatus() { return this.loaderStatus; } public ErrorCode loadPool(JobPool pool) { return null; } public abstract public boolean resume() { return true; } public boolean stop() { return true; } public boolean suspend() { return true; } /** * Chance to grab the properties defined for this job loader in job * configuration and do some initialization. * * @param configProps Properties defined in job configuration as a Map. */ public abstract void initialize(Map configProps); /** * Chance to do some cleanup for the Loader implementation. */ public abstract void cleanup(); /** * Returns the number of job data objects that this loader is going to load. * * @return Return the number of job data object its going to load. */ public abstract int getTotalJobDataCount(); } |
From: Suresh <sur...@us...> - 2006-05-16 03:15:32
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv26153 Modified Files: AbstractPoolJobProcessor.java CollectionJobPool.java JobPool.java PoolJobLoader.java PoolJobProcessor.java Removed Files: AbstractPoolJobLoader.java Log Message: no message Index: PoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobLoader.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** PoolJobLoader.java 14 May 2006 01:31:32 -0000 1.3 --- PoolJobLoader.java 16 May 2006 03:15:28 -0000 1.4 *************** *** 1,4 **** --- 1,5 ---- package org.jmonks.batchserver.framework.controller.pool; import java.util.Map; + import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 15,33 **** /** * Chance for the loader to initialize itself with the properties defined ! * in the job configuration and gets the reference to the job pool. * * @param configProps Configuration defined for the loader. - * @param pool Job Pool reference. */ ! public void initialize(Map configProps, JobPool pool); /** ! * Chance to do some cleanup before the job being shutdown. */ public void cleanup(); /** ! * Loads the job data to the job pool. When finished loading of all the job ! * data, loads the <i>NULL</i> into the pool. */ ! public void loadJobData(); /** * Suspends the loader. --- 16,51 ---- /** * Chance for the loader to initialize itself with the properties defined ! * in the job configuration. * * @param configProps Configuration defined for the loader. */ ! public void initialize(Map configProps); /** ! * Chance to do some cleanup before the job loader being shutdown. */ public void cleanup(); /** ! * <p> ! * Loads the job data to the given job pool. When finished loading of all the job ! * data, loads the <i>null</i> into the pool to singal the processor(s) that ! * loading of all the jobs have been done. ! * <br> ! * Example loading the 100 integer objects into the pool. ! * <pre> ! * public class MyLoader ! * { ! * public ErrorCode loadPool(JobPool pool) ! * { ! * for(int i=0;i<100;i++ ! * pool.loadJobData(new Integer(i)); ! * pool.loadJobData(null); ! * } ! * } ! * <pre> ! * </p> ! * ! * @param pool Job Pool reference. */ ! public ErrorCode loadPool(JobPool pool); /** * Suspends the loader. *************** *** 53,57 **** * @return Returns the number of records this loader is going to load. */ ! public int getTotalRecords(); /** * Gets the loader state as object which can be understan by the monitoring --- 71,75 ---- * @return Returns the number of records this loader is going to load. */ ! public int getTotalJobDataCount(); /** * Gets the loader state as object which can be understan by the monitoring Index: JobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/JobPool.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** JobPool.java 15 May 2006 03:29:32 -0000 1.4 --- JobPool.java 16 May 2006 03:15:28 -0000 1.5 *************** *** 50,53 **** * @return Returns the number of job data objects being loaded into the pool. */ ! public int getLoadedCount(); } --- 50,53 ---- * @return Returns the number of job data objects being loaded into the pool. */ ! public int getLoadedJobDataCount(); } Index: PoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobProcessor.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** PoolJobProcessor.java 14 May 2006 01:31:32 -0000 1.3 --- PoolJobProcessor.java 16 May 2006 03:15:28 -0000 1.4 *************** *** 2,5 **** --- 2,6 ---- import java.util.Map; + import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 19,25 **** * * @param configProps Configuration defined for the job processor in job configuration. - * @param pool Reference to Job Pool. */ ! public void initialize(Map configProps, JobPool pool); /** * Suspends the pool job processor. --- 20,25 ---- * * @param configProps Configuration defined for the job processor in job configuration. */ ! public void initialize(Map configProps); /** * Suspends the pool job processor. *************** *** 48,54 **** * out of the job data(returns null). * ! * @return Returns the number of job data instances being procssed. */ ! public int process(); /** * Gets the processor to be displyed or anaylyzed for the monitoring purposes. --- 48,56 ---- * out of the job data(returns null). * ! * @param pool Reference to Job Pool. ! * ! * @return Returns the error code. */ ! public ErrorCode process(JobPool pool); /** * Gets the processor to be displyed or anaylyzed for the monitoring purposes. *************** *** 63,65 **** --- 65,74 ---- */ public ProcessorStatus getProcessorStatus(); + /** + * Returns the number of job data objects this particular job processor + * has finsihed. + * + * @return Returns the number of job data objects processed. + */ + public int getProcessedJobDataCount(); } Index: AbstractPoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobProcessor.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** AbstractPoolJobProcessor.java 4 Mar 2006 04:42:01 -0000 1.2 --- AbstractPoolJobProcessor.java 16 May 2006 03:15:28 -0000 1.3 *************** *** 1,54 **** package org.jmonks.batchserver.framework.controller.pool; import java.util.Map; /** ! * <p> ! * Abstract job processor implements some of responsiblites defined by the job processor and leaves the application specific functionality implementation to the final processor. ! * </p> ! * @author : Suresh Pragada ! * @version 1.0 */ public abstract class AbstractPoolJobProcessor implements PoolJobProcessor { ! private JobPool mJobPool; ! public AbstractPoolJobProcessor() ! { ! } ! public abstract void initialize(Map configProps); ! public boolean suspend() { ! return true; } ! public boolean resume() { return true; } ! public boolean stop() { return true; } ! ! ! public abstract void cleanup(); ! ! public int processPool() { ! return 0; } ! public Object getProcessorState() { return null; } ! ! public void initialize(Map configProps, org.jmonks.batchserver.framework.controller.pool.JobPool pool) { } } --- 1,131 ---- package org.jmonks.batchserver.framework.controller.pool; + import EDU.oswego.cs.dl.util.concurrent.Mutex; import java.util.Map; + import org.apache.log4j.Logger; + import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.management.ProcessorStatus; /** ! * <p> ! * Abstract pool job processor implements some of the responsiblites defined by the ! * pool job processor and leaves the application specific functionality implementation ! * to the final processor. ! * </p> ! * @author Suresh Pragada ! * @version 1.0 ! * @since 1.0 */ public abstract class AbstractPoolJobProcessor implements PoolJobProcessor { + /** + * Counts the number of job data objects processed. + */ + private int processedJobDataCount=0; ! private boolean stopSignal=false; ! ! private boolean suspendSignal=false; ! ! private Mutex suspendLock=new Mutex(); ! ! private static Logger logger=Logger.getLogger(AbstractPoolJobProcessor.class); ! ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#initialize(java.util.Map) ! */ public abstract void initialize(Map configProps); ! /** ! * ! */ ! public abstract ErrorCode process(Object jobData); ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#cleanup() ! */ ! public abstract void cleanup(); ! ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#suspend() ! */ public boolean suspend() { ! this.suspendSignal=true; ! return this.suspendSignal; } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#resume() ! */ public boolean resume() { + this.stopSignal=false; + this.suspendSignal=false; + this.suspendLock.release(); return true; } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#stop() ! */ public boolean stop() { return true; } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#suspend() ! */ ! public ErrorCode processPool(JobPool pool) { ! Object jobData=null; ! while((jobData=pool.getNextJobData())!=null) ! { ! try ! { ! this.processedJobDataCount++; ! ErrorCode errorCode=this.process(jobData); ! } ! catch(Throwable exception) ! { ! exception.printStackTrace(); ! } ! if(this.stopSignal) ! { ! logger.info("Received the stop signal.. Preparing to stop"); ! break; ! } ! if(this.suspendSignal) ! { ! try ! { ! this.suspendLock.acquire(); ! } ! catch(InterruptedException exception) ! { ! exception.printStackTrace(); ! logger.error("Got exception while suspending..." + exception.getMessage(), exception); ! } ! } ! } ! return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessorState() ! */ public Object getProcessorState() { return null; } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessorStatus() ! */ ! public ProcessorStatus getProcessorStatus() { + return null; + } + /** + * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessedJobDataCount() + */ + public int getProcessedJobDataCount() + { + return processedJobDataCount; } } --- AbstractPoolJobLoader.java DELETED --- Index: CollectionJobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/CollectionJobPool.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** CollectionJobPool.java 15 May 2006 03:29:32 -0000 1.5 --- CollectionJobPool.java 16 May 2006 03:15:28 -0000 1.6 *************** *** 82,86 **** */ this.pool.put(END_OF_POOL); - System.out.println(Thread.currentThread().getName() + "Pushing end of pool"); } loaded=true; --- 82,85 ---- *************** *** 109,113 **** if(jobData==END_OF_POOL) { - System.out.println(Thread.currentThread().getName() + "Received end of pool"); this.pool.put(END_OF_POOL); jobData=null; --- 108,111 ---- *************** *** 144,149 **** { poolSize=Integer.parseInt(poolSizePropertyValue); ! // if(poolSize<1) ! // poolSize=CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE; } catch(Exception exception) --- 142,149 ---- { poolSize=Integer.parseInt(poolSizePropertyValue); ! if(poolSize<1) ! { ! poolSize=CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE; ! } } catch(Exception exception) *************** *** 171,177 **** /** ! * @see org.jmonks.batchserver.framework.controller.pool.JobPool#getLoadedCount */ ! public int getLoadedCount() { return this.loadedJobsCount; --- 171,177 ---- /** ! * @see org.jmonks.batchserver.framework.controller.pool.JobPool#getLoadedJobDataCount */ ! public int getLoadedJobDataCount() { return this.loadedJobsCount; |
From: Suresh <sur...@us...> - 2006-05-15 13:31:38
|
Update of /cvsroot/batchserver/batchserver/test/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv18519 Added Files: CollectionJobPoolTest.java Log Message: no message --- NEW FILE: CollectionJobPoolTest.java --- /* * CollectionJobPoolTest.java * JUnit based test * * Created on May 13, 2006, 11:35 PM */ package org.jmonks.batchserver.framework.controller.pool; import java.util.HashMap; import junit.framework.*; /** * * @author Suresh Pragada */ public class CollectionJobPoolTest extends TestCase { public CollectionJobPoolTest(String testName) { super(testName); } public static Test suite() { TestSuite suite = new TestSuite(CollectionJobPoolTest.class); return suite; } /** * Method to test entire collection job pool. */ public void testCollectionJobPool() throws Exception { JobPool pool=new CollectionJobPool(); pool.initialize(new HashMap()); Thread loader=new Thread(this.getLoaderRunnable(pool)); Thread processor1=new Thread(this.getProcessorRunnable(pool)); Thread processor2=new Thread(this.getProcessorRunnable(pool)); Thread processor3=new Thread(this.getProcessorRunnable(pool)); loader.start(); //processor1.start(); //processor2.start(); //processor3.start(); } private Runnable getLoaderRunnable(final JobPool pool) { return new Runnable() { public void run() { for(int i=0;i<100;i++) { System.out.println("Going to load " + i); boolean loaded=pool.loadJobData(new Integer(i)); System.out.println("Loader = " + i + " status = " + loaded); } System.out.println("Done loading the jobs"); pool.loadJobData(null); System.out.println("Done loading the null"); } }; } private Runnable getProcessorRunnable(final JobPool pool) { return new Runnable() { public void run() { Object data=null; while((data=pool.getNextJobData())!=null) { System.out.println(Thread.currentThread().getName() + " = " + ((Integer)data).intValue()); } System.out.println(Thread.currentThread().getName() + " Done reading all the jobs from the buffer"); } }; } } |
From: Suresh <sur...@us...> - 2006-05-15 13:31:15
|
Update of /cvsroot/batchserver/batchserver/test/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv18473/pool Log Message: Directory /cvsroot/batchserver/batchserver/test/org/jmonks/batchserver/framework/controller/pool added to the repository |
From: Suresh <sur...@us...> - 2006-05-15 03:29:36
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv28607 Modified Files: CollectionJobPool.java JobPool.java Log Message: no message Index: JobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/JobPool.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** JobPool.java 14 May 2006 01:31:32 -0000 1.3 --- JobPool.java 15 May 2006 03:29:32 -0000 1.4 *************** *** 51,59 **** */ public int getLoadedCount(); - /** - * Returns the number of job data objects being processed from the pool. - * - * @return Returns the number of job data objects have been processed so far. - */ - public int getProcessedCount(); } --- 51,53 ---- Index: CollectionJobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/CollectionJobPool.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** CollectionJobPool.java 14 May 2006 04:36:44 -0000 1.4 --- CollectionJobPool.java 15 May 2006 03:29:32 -0000 1.5 *************** *** 46,53 **** protected int loadedJobsCount=0; /** - * Holds the number of job data objects have been served to job processors. - */ - protected int processedJobsCount=0; - /** * Object to be pushed to the bounded buffer to denote the end of the pool. */ --- 46,49 ---- *************** *** 62,66 **** { } ! public boolean loadJobData(Object jobData) { --- 58,64 ---- { } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.JobPool#loadJobData(Object) ! */ public boolean loadJobData(Object jobData) { *************** *** 70,81 **** try { ! if(jobData==null) ! jobData=END_OF_POOL; ! this.pool.put(jobData); ! loaded=true; ! this.loadedJobsCount++; } catch(InterruptedException exception) { logger.info("Got exception while loading the job data = " + jobData, exception); } --- 68,92 ---- try { ! if(jobData!=null) ! { ! this.pool.put(jobData); ! /** END_OF_POOL will be loaded by getNextJobData API to wake up other processors */ ! if(jobData!=END_OF_POOL) ! this.loadedJobsCount++; ! } ! else ! { ! /** ! * null from loader indicates that he has done loading with the job data. ! * Push END_OF_POOL to the pool to let processors know that loader is done loading the jobs. ! */ ! this.pool.put(END_OF_POOL); ! System.out.println(Thread.currentThread().getName() + "Pushing end of pool"); ! } ! loaded=true; } catch(InterruptedException exception) { + exception.printStackTrace(); logger.info("Got exception while loading the job data = " + jobData, exception); } *************** *** 83,87 **** return loaded; } ! public Object getNextJobData() { --- 94,100 ---- return loaded; } ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.JobPool#getNextJobData ! */ public Object getNextJobData() { *************** *** 90,103 **** { jobData=this.pool.take(); if(jobData==END_OF_POOL) { this.pool.put(END_OF_POOL); jobData=null; } - else - this.processedJobsCount++; } catch(InterruptedException exception) { logger.info("Got exception while getting the job data from pool", exception); } --- 103,120 ---- { jobData=this.pool.take(); + /** + * If retrieved job data is END_OF_POOL object return null to caller as per + * method contract and put EN_OF_POOL back to the pool to wake up other processors. + */ if(jobData==END_OF_POOL) { + System.out.println(Thread.currentThread().getName() + "Received end of pool"); this.pool.put(END_OF_POOL); jobData=null; } } catch(InterruptedException exception) { + exception.printStackTrace(); logger.info("Got exception while getting the job data from pool", exception); } *************** *** 107,111 **** /** * Initializes the collection job pool using the configuration defined ! * in the job configuration. It tries to get the pool size from the given configuration, * by looking for the property "job-pool-size", if it couldnt find it uses the default pool size "1000" * and initializes the collection to be used as the pool. --- 124,128 ---- /** * Initializes the collection job pool using the configuration defined ! * in the job configuration. It gets the pool size from the given configuration, * by looking for the property "job-pool-size", if it couldnt find it uses the default pool size "1000" * and initializes the collection to be used as the pool. *************** *** 127,130 **** --- 144,149 ---- { poolSize=Integer.parseInt(poolSizePropertyValue); + // if(poolSize<1) + // poolSize=CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE; } catch(Exception exception) *************** *** 151,162 **** } public int getLoadedCount() { return this.loadedJobsCount; } - - public int getProcessedCount() - { - return this.processedJobsCount; - } } --- 170,179 ---- } + /** + * @see org.jmonks.batchserver.framework.controller.pool.JobPool#getLoadedCount + */ public int getLoadedCount() { return this.loadedJobsCount; } } |
From: Suresh <sur...@us...> - 2006-05-14 04:36:46
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv29009 Modified Files: CollectionJobPool.java Log Message: no message Index: CollectionJobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/CollectionJobPool.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** CollectionJobPool.java 14 May 2006 01:31:32 -0000 1.3 --- CollectionJobPool.java 14 May 2006 04:36:44 -0000 1.4 *************** *** 49,52 **** --- 49,56 ---- */ protected int processedJobsCount=0; + /** + * Object to be pushed to the bounded buffer to denote the end of the pool. + */ + private static final Object END_OF_POOL = new Object(); private static Logger logger=Logger.getLogger(CollectionJobPool.class); *************** *** 66,69 **** --- 70,75 ---- try { + if(jobData==null) + jobData=END_OF_POOL; this.pool.put(jobData); loaded=true; *************** *** 84,88 **** { jobData=this.pool.take(); ! this.processedJobsCount++; } catch(InterruptedException exception) --- 90,100 ---- { jobData=this.pool.take(); ! if(jobData==END_OF_POOL) ! { ! this.pool.put(END_OF_POOL); ! jobData=null; ! } ! else ! this.processedJobsCount++; } catch(InterruptedException exception) |