[Batchserver-cvs] batchserver/src/org/jmonks/batch/framework/controller/basic BasicJobController.j
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-09-15 20:06:42
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batch/framework/controller/basic In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv19904 Added Files: BasicJobController.java BasicJobProcessor.java Log Message: no message --- NEW FILE: BasicJobProcessor.java --- package org.jmonks.batch.framework.controller.basic; import org.apache.log4j.Logger; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.management.ProcessorStatus; /** * <p> * BasicJobProcessor lets job developers to implement their business logic * in the process method and this method will be executed by the BasicJobController. * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public abstract class BasicJobProcessor { /** * Holds the thread references this processor has been spawned. * This reference will be used to suspend, resume and stop the processor. */ protected Thread processorThread=null; /** * Indicates that thread has been registered or not. */ private boolean threadRegistered=false; /** * Indicates the status of this processor. */ protected ProcessorStatus processorStatus=ProcessorStatus.INSTANTIATED; private static Logger logger=Logger.getLogger(BasicJobProcessor.class); /** * Gets the thread reference this processor is going to be executed on. * If processorThread has already initialized, it ignores this request. * * @return Returns true if thread has been registered with the processor, false otherwise. */ final boolean registerThread() { logger.trace("Entering registerThread"); boolean registered=true; if(!threadRegistered) { this.processorThread=Thread.currentThread(); logger.debug("Registering the thread = " + this.processorThread.getName()); this.threadRegistered=true; } else { registered=false; } logger.trace("Exiting registerThread"); return registered; } /** * <p> * Basic job controller calls this method to suspend the job processor, when it receives * a <i>suspsend</i> message from the managemet interface. Default implementation * will suspend the job processor abruptly as soon as it receives the message. * Implementors can provide their own implementation by overriding this method * to provide better mechanism. * </p> * * @return Returns true if processor has suspended, false otherwise. */ public boolean suspend() { if(this.processorStatus!=ProcessorStatus.SUSPENDED) { this.processorThread.suspend(); this.processorStatus=ProcessorStatus.SUSPENDED; return true; } else return false; } /** * <p> * Basic job controller calls this method to resume the job processor, when it receives * a <i>resume</i> message from the managemet interface. Default implementation * will resume the job processor abruptly as soon as it receives the message. * Implementors can provide their own implementation by overriding this method * to provide better mechanism. * </p> * * @return Returns true if processor has resumed, false otherwise. */ public boolean resume() { if(this.processorStatus==ProcessorStatus.SUSPENDED) { this.processorThread.resume(); this.processorStatus=ProcessorStatus.RUNNING; return true; } else return false; } /** * <p> * Basic job controller calls this method to stop the job processor, when it receives * a <i>stop</i> message from the managemet interface. Default implementation * will stop the job processor abruptly as soon as it receives the message. * Implementors can provide their own implementation by overriding this method * to provide better mechanism. * </p> * * @return Returns true if processor has stopped, false otherwise. */ public boolean stop() { if(this.processorStatus!=ProcessorStatus.STOPPED || this.processorStatus!=ProcessorStatus.FINISHED) { this.processorThread.stop(); this.processorStatus=ProcessorStatus.STOPPED; return true; } else return false; } /** * <p> * Does the processing and return the appropriate error code. Properties * configured for this job processor can be retrieved from the JobContext. * <br> * <pre> * public class MyBasicJobProcessor extends BasicJobProcessor * { * public ErrorCode process(JobContext jobContext) * { * //Perform the business logic. * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; * } * } * </pre> * </p> * * @param jobContext Context of the job, where all the properties will be defined. * * @return Returns the appropriate error code needs to be passed to the invocation layer. */ public abstract ErrorCode process(JobContext jobContext); /** * This is for monitoring applications to know what the information (record) * this job processor is processing. Implementers should provide the * granular level of information that this processor is processing at this time * for better monitoring of this job. This returns object * which can be used for the purpose of display. * * @return Returns the information (record) this processor is processing. */ public abstract Object getProcessorState(); /** * Returns the status of the processor as a ProcessorStatus object, which gives * the information like whether the processor is running, suspended, resumed * or stopped. Defalt implementation of this method returns the status based * on the default implementation of suspend, resume and stop methods. * * @return Returns the processor status. */ public ProcessorStatus getProcessorStatus() { return this.processorStatus; } /** * This is for monitoring applications to know how many records this particular * (if there are multiple job processors) job processor is going to process. * * @return Returns the number of records/jobs this processor is going to process. */ public abstract long getTotalRecordsCount(); /** * This is for monitoring applications to know how many records this particular * (if there are multiple job processors) job processor has finished processing. * * @return Returns the number of records/jobs this processor has finished processing. */ public abstract long getProcessedRecordsCount(); } --- NEW FILE: BasicJobController.java --- package org.jmonks.batch.framework.controller.basic; import EDU.oswego.cs.dl.util.concurrent.Callable; import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; import java.util.Calendar; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; import org.jmonks.batch.framework.JobStatistics; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.config.BasicJobControllerConfig; import org.jmonks.batch.framework.config.ConfigurationException; import org.jmonks.batch.framework.controller.JobController; import org.jmonks.batch.framework.management.ProcessorStatus; import org.jmonks.batch.framework.management.ProcessorState; /** * <p> * BasicJobController creates and executes the Basic Job Processor * class defined in job configuration. It creates the configured number * of instances and have each instance run in a seperate thread. * Once all the processors have finished their processing, returns the * appropriate return code. * </p> * * <p> * This controller is useful to write and execute a simple business logic * stands on its own. Allows the flexibility to run that code in a number * of instances. Following is an example configuration to configure the * batch job written based on BasicJobController. * <br><br> * <i>XML Configuration is as follows</i> <br><br> * <pre> * <job-config job-name="process_file_abc"> * <job-controller controller-class-name="org.jmonks.batch.framework.controller.basic.BasicJobController"> * <basic-job-processor basic-job-processor-class-name="com.mycompany.batch.processfileabc.AbcJobProcessor" thread-count="5"> * <property key="basic-job-processor-config1">basic-job-processor-value1</property> * </basic-job-processor> * <property key="basic-controller-config1">basic-controller-value1</property> * </job-controller> * <job-logging-config> * <logging-property-file>com.mycompany.batch.processfileabc.Logging</logging-property-file> * </job-logging-config> * </job-config> * </pre> * <br><br> * <i>DB Configuration is as follows</i> * <table border="1"> * <tr><td><b>TableName.ColumnName</b></td><td><b>Value</b></td></tr> * <tr><td>job_config.job_name</td><td>process_file_abc</td></tr> * <tr><td>job_config.job_status</td><td>1</td></tr> * <tr><td>job_config.job_controller_class_name</td><td>org.jmonks.batch.framework.controller.basic.BasicJobController</td></tr> * <tr><td>job_config.job_controller_props</td><td>basic-controller-config1=basic-controller-value1:basic-controller-config1=basic-controller-value2</td></tr> * <tr><td>basic_job_controller_config.job_name</td><td>process_file_abc</td></tr> * <tr><td>basic_job_controller_config.basic_job_processor_class_name</td><td>com.mycompany.batch.processfileabc.AbcJobProcessor</td></tr> * <tr><td>basic_job_controller_config.basic_job_processor_props</td><td>basic-job-processor-config1=basic-job-processor-value1</td></tr> * <tr><td>basic_job_controller_config.basic_job_processor_thread_cnt</td><td>1</td></tr> * </table> * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public class BasicJobController extends JobController { /** * Map holds all the basic job processors being used for the given job as values and * name of the threads as keys. */ private Map jobProcessorsMap=new Hashtable(); /** * Map holds all the basic job processors returned error codes as values and * name of the threads as keys. */ private Map jobProcessorsResultMap=new Hashtable(); /** * Holds the statistics of the basic job controller. */ private JobStatistics jobStatistics=null; private static Logger logger=Logger.getLogger(BasicJobController.class); /** * Constructor enables the instantiation of the basic job controller instance. */ public BasicJobController() { } /** * <p> * Executes the job by running the configured number of basic job processors * and returns the appropriate errorcode. It collects all the return codes returned * by job processors and return the first non successful return code it found in the * list. If it doesnt find any non successful return code, it will return successful * error code. * </p> * * @return Returns the processing error code for the job. */ public ErrorCode process() { logger.info("Entering process in basic job controller = " + super.jobContext.getJobName()); BasicJobControllerConfig basicJobControllerConfig=(BasicJobControllerConfig)super.jobContext.getJobConfig().getJobControllerConfig(); int threadCount=basicJobControllerConfig.getBasicJobProcessThreadCount(); /** * If thread count is not greater than zero.. set it as 0 */ threadCount=threadCount>0?threadCount:0; CountDown countDownLock=new CountDown(threadCount); logger.debug("Going to create " + threadCount + " basic job processor(s)"); this.jobStatistics=new JobStatistics(super.jobContext.getJobName()); this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); for(int i=0;i<threadCount;i++) { String threadID=super.jobContext.getJobName()+"_"+(i+1); BasicJobProcessor jobProcessor=this.getBasicJobProcessor(basicJobControllerConfig.getBasicJobProcessorClassName()); FutureResult result=new FutureResult(); Thread processorThread=new Thread(result.setter(this.getCallableProcessor (countDownLock,jobProcessor,super.jobContext)), threadID); processorThread.start(); this.jobProcessorsMap.put(threadID,jobProcessor); this.jobProcessorsResultMap.put(threadID, result); logger.info(threadID + " basic job processor has been kicked off."); } ErrorCode returnCode=hybernate(countDownLock); logger.info("Exiting process in basic job controller = " + super.jobContext.getJobName() + " with return code = " + returnCode); return returnCode; } /** * Returns the total number of records this job going to process. This will be * the sum of the expected count from all the job processors. * * @return Returns the total number of records the job going to process. */ public long getExpectedRecordsCount() { logger.trace("Entering getExpectedRecordsCount"); long expectedRecordsCount=0; for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) { BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); expectedRecordsCount=expectedRecordsCount+jobProcessor.getTotalRecordsCount(); } logger.trace("Entering getExpectedRecordsCount = " + expectedRecordsCount); return expectedRecordsCount; } /** * Returns the number of records processed so far. This will be * the sum of the processed count from all the job processors. * * @return Returns the count of processed records. */ public long getProcessedRecordsCount() { logger.trace("Entering getProcessedRecordsCount"); long processedRecordsCount=0; for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) { BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); processedRecordsCount=processedRecordsCount+jobProcessor.getProcessedRecordsCount(); } logger.trace("Entering getProcessedRecordsCount = " + processedRecordsCount); return processedRecordsCount; } /** * Returns the IDs of all the processors as string array. * * @return Returns the string array consist of all the processor IDs. */ public String[] getProcessorIDList() { logger.trace("Entering getProcessorIDList"); String processorIDList[]=new String[this.jobProcessorsMap.size()]; int i=0; for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) processorIDList[i]=(String)iterator.next(); logger.trace("Exiting getProcessorIDList"); return processorIDList; } /** * Returns the state of the requested processor as a ProcessorState object. * * @return Returns the processor state. */ public ProcessorState getProcessorState(String processorID) { logger.trace("Exiting getProcessorState = " + processorID); ProcessorState state=null; BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); if(jobProcessor!=null) state=new ProcessorState(processorID, "Basic Job Processor", jobProcessor.getProcessorState()); else state=null; logger.trace("Exiting getProcessorState = " + processorID + " state = " + state); return state; } /** * Returns the status of the processor identified by the given processor ID. * * @return Returns the ProcessorStatus object represents the status. */ public ProcessorStatus getProcessorStatus(String processorID) { logger.trace("Exiting getProcessorStatus = " + processorID); ProcessorStatus status=null; BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); if(jobProcessor!=null) status=jobProcessor.getProcessorStatus(); else status=null; logger.trace("Exiting getProcessorStatus = " + processorID + " state = " + status); return status; } /** * Stops the processor identified by the given processor ID. * * @return Returns true, if processor could be stopped, false otherwise. */ public boolean stop(String processorID) { logger.trace("Entering stop = " + processorID); boolean stopped=true; BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); if(jobProcessor!=null) stopped=jobProcessor.stop(); else stopped=false; logger.trace("Exiting stop = " + processorID + " status = " + stopped); return stopped; } /** * Suspends the processor identified by the given processor ID. * * @return Returns true if it could suspend the processor, false otherwise. */ public boolean suspend(String processorID) { logger.trace("Entering suspend = " + processorID); boolean suspended=true; BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); if(jobProcessor!=null) suspended=jobProcessor.suspend(); else suspended=false; logger.trace("Exiting suspend = " + processorID + " suspended = " + suspended); return suspended; } /** * Resumes the processor identified by given processor ID. * * @return Returns true if processor is resumed, false otherwise. */ public boolean resume(String processorID) { logger.trace("Entering resume = " + processorID); boolean resumed=true; BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); if(jobProcessor!=null) resumed=jobProcessor.resume(); else resumed=false; logger.trace("Exiting resume = " + processorID + " resumed = " + resumed); return resumed; } /** * Instantiates and returns the basic job processor using the given class name. * The given class name should implement the BasicJobProcessor interface. * * @param basicJobProcessorClassName Class name that implements the BasicJobProcessor interface. * * @return Returns the baic job processor instance. * * @throws ConfigurationException If it couldnt instantiate basic job processor instance. */ private BasicJobProcessor getBasicJobProcessor(String basicJobProcessorClassName) { logger.trace("Entering getBasicJobProcessor = " + basicJobProcessorClassName); BasicJobProcessor jobProcessor=null; try { jobProcessor=(BasicJobProcessor)Class.forName(basicJobProcessorClassName).newInstance(); } catch(ClassNotFoundException exception) { exception.printStackTrace(); logger.error(exception.getMessage(),exception); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); } catch(InstantiationException exception) { exception.printStackTrace(); logger.error(exception.getMessage(),exception); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); } catch(IllegalAccessException exception) { exception.printStackTrace(); logger.error(exception.getMessage(),exception); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); } logger.trace("Exiting getBasicJobProcessor"); return jobProcessor; } /** * <p> * Creates the runnable object by encapsulating the given job processor and * kick off the process method on the processor. Once processor has finsihed * its processing reports the error code to the controller by calling the "done" * method on the controller. * </p> * * @param countDownLock CountDown object to release when the processing has been done. * @param jobProcessor Initialized Basic job processor instance. * @param jobContext JobContext to be passed to the job processors. * * @return Returns the runnable instance. */ private Callable getCallableProcessor(final CountDown countDownLock,final BasicJobProcessor jobProcessor,final JobContext jobContext) { logger.trace("Entering getCallableProcessor"); Callable callable=new Callable(){ public Object call() { ErrorCode returnCode=null; try { boolean registered=jobProcessor.registerThread(); logger.debug("Status of registering thread with the processor = " + registered); logger.trace("Going to call the process method"); returnCode=jobProcessor.process(jobContext); logger.debug("Done calling the process method"); } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while processing = " + exception.getMessage(), exception); returnCode=ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); return returnCode; } }; logger.trace("Exiting getCallableProcessor"); return callable; } /** * <p> * Main thread will call this method and wait until all the processors have * finished their processing. Once they are finished, all the return codes * will be analyzed and one error code will be returned for this job. * </p * @param countDownLock CountDown lock being used by all the processors. * * @return Returns the error code of the job. */ private ErrorCode hybernate(CountDown countDownLock) { logger.trace("Entering hybernate"); ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { logger.debug("Going to wait until all the processors is gonna finish."); countDownLock.acquire(); logger.info("All processors have finished their task."); } catch(InterruptedException exception) { exception.printStackTrace(); logger.error("Exception while waiting for all the processors = " + exception.getMessage(), exception); return ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } this.jobStatistics.setEndTime(Calendar.getInstance().getTime()); this.jobStatistics.setMaxMemeoryUsage(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()); this.jobStatistics.setRecordsProcessed(this.getProcessedRecordsCount()); for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) { ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); if(!ErrorCode.JOB_COMPLETED_SUCCESSFULLY.equals(threadReturnCode)) { returnCode=threadReturnCode; break; } } this.jobStatistics.setExitCode(returnCode); logger.trace("Exiting hybernate = " + returnCode); return returnCode; } /** * @see org.jmonks.batch.framework.controller.JobController#getJobStatistics() */ public JobStatistics getJobStatistics() { if(this.jobStatistics.getEndTime()!=null) return this.jobStatistics; else return null; } } |