[Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/basic BasicJobControll
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-05-03 22:11:22
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv9605/org/jmonks/batchserver/framework/controller/basic Modified Files: BasicJobController.java BasicJobProcessor.java Log Message: no message Index: BasicJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobProcessor.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** BasicJobProcessor.java 3 May 2006 13:01:49 -0000 1.4 --- BasicJobProcessor.java 3 May 2006 22:11:17 -0000 1.5 *************** *** 30,33 **** public int getProcessedCount(); - } --- 30,32 ---- Index: BasicJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobController.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** BasicJobController.java 3 May 2006 13:01:49 -0000 1.5 --- BasicJobController.java 3 May 2006 22:11:17 -0000 1.6 *************** *** 1,5 **** --- 1,15 ---- package org.jmonks.batchserver.framework.controller.basic; + import EDU.oswego.cs.dl.util.concurrent.CountDown; + import java.util.HashMap; + import java.util.Hashtable; + import java.util.Iterator; + import java.util.Map; + import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.config.BasicJobControllerConfig; + import org.jmonks.batchserver.framework.config.ConfigurationException; import org.jmonks.batchserver.framework.controller.JobController; + import org.jmonks.batchserver.framework.management.JobStatus; + import org.jmonks.batchserver.framework.management.ThreadState; /** *************** *** 12,26 **** * @since 1.0 */ ! public class BasicJobController extends JobController { ! ! private BasicJobProcessor basicJobProcessor; public BasicJobController() { } public ErrorCode process() { ! return null; } --- 22,88 ---- * @since 1.0 */ ! public class BasicJobController extends JobController ! { ! /** ! * Map holds all the basic job processors being used for the given job as values and ! * name of the threads as keys. ! */ ! private Map jobProcessorsMap=new Hashtable(); ! /** ! * Map holds all the basic job processors returned error codes as values and ! * name of the threads as keys. ! */ ! private Map jobProcessorsResultMap=new Hashtable(); + private static Logger logger=Logger.getLogger(BasicJobController.class); + /** + * Constructor enables the instantiation of the basic job controller instance. + */ public BasicJobController() { } + /** + * <p> + * Executes the job by running the configured number of basic job processors + * and returns the appropriate errorcode. It collects all the return codes returned + * by job processors and return the first non successful return code it found in the + * list. If it doesnt find any non successful return code, it will return successful + * error code. + * </p> + * + * @return Returns the processing error code for the job. + */ public ErrorCode process() { ! BasicJobControllerConfig basicJobControllerConfig=(BasicJobControllerConfig)super.getJobControllerConfig(); ! int threadCount=basicJobControllerConfig.getBasicJobProcessThreadCount(); ! /** ! * If thread count is not greater than zero.. set it as 0 ! */ ! threadCount=threadCount>0?threadCount:0; ! CountDown countDownLock=new CountDown(threadCount); ! for(int i=0;i<threadCount;i++) ! { ! BasicJobProcessor jobProcessor=this.getBasicJobProcessor(basicJobControllerConfig.getBasicJobProcessorClassName()); ! /** ! * Make seperate copy of property map for each processor and initialize with that map. ! */ ! Map processorConfigProperties=new HashMap(basicJobControllerConfig.getBasicJobProcessorConfigProperties()); ! try ! { ! jobProcessor.initialize(processorConfigProperties); ! } ! catch(Throwable exception) ! { ! exception.printStackTrace(); ! logger.error(exception.getMessage(), exception); ! } ! String threadID=super.getJobName()+"_"+(i+1); ! Thread thread=new Thread(this.getRunnableProcessor(countDownLock,jobProcessor), threadID); ! thread.start(); ! this.jobProcessorsMap.put(threadID,jobProcessor); ! } ! return hybernate(countDownLock); } *************** *** 28,32 **** * Returns the total number of records. * ! * @return Returns the total number of records the job gonna process. */ public int getExpectedRecordsCount() --- 90,94 ---- * Returns the total number of records. * ! * @return Returns the total number of records the job going to process. */ public int getExpectedRecordsCount() *************** *** 36,40 **** /** ! * Returns the numebr of records processed so far. */ public int getProcessedRecordsCount() --- 98,102 ---- /** ! * Returns the number of records processed so far. */ public int getProcessedRecordsCount() *************** *** 46,50 **** * Returns the thread count. */ ! public java.lang.String[] getThreadIDList() { return null; --- 108,112 ---- * Returns the thread count. */ ! public String[] getThreadIDList() { return null; *************** *** 54,58 **** * Returns the state of the thread as a ThreadState object. */ ! public org.jmonks.batchserver.framework.management.ThreadState getThreadState(String threadID) { return null; --- 116,120 ---- * Returns the state of the thread as a ThreadState object. */ ! public ThreadState getThreadState(String threadID) { return null; *************** *** 62,66 **** * Returns the job status. */ ! public org.jmonks.batchserver.framework.management.JobStatus getJobStatus() { return null; --- 124,128 ---- * Returns the job status. */ ! public JobStatus getJobStatus() { return null; *************** *** 68,74 **** /** ! * Stops the job and persist the state of this job, if restart flag is true. */ ! public boolean stop(boolean restart) { return true; --- 130,136 ---- /** ! * Stops the job. */ ! public boolean stop() { return true; *************** *** 90,92 **** --- 152,287 ---- return true; } + + /** + * Instantiates and returns the basic job processor using the given class name. + * The given class name should implement the BasicJobProcessor interface. + * + * @param basicJobProcessorClassName Class name that implements the BasicJobProcessor interface. + * + * @return Returns the baic job processor instance. + * + * @throws ConfigurationException If it couldnt instantiate basic job processor instance. + */ + private BasicJobProcessor getBasicJobProcessor(String basicJobProcessorClassName) + { + BasicJobProcessor jobProcessor=null; + try + { + jobProcessor=(BasicJobProcessor)Class.forName(basicJobProcessorClassName).newInstance(); + } + catch(ClassNotFoundException exception) + { + exception.printStackTrace(); + logger.error(exception.getMessage(),exception); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); + } + catch(InstantiationException exception) + { + exception.printStackTrace(); + logger.error(exception.getMessage(),exception); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); + } + catch(IllegalAccessException exception) + { + exception.printStackTrace(); + logger.error(exception.getMessage(),exception); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); + } + return jobProcessor; + } + + /** + * <p> + * Creates the runnable object by encapsulating the given job processor and + * kick off the process method on the processor. Once processor has finsihed + * its processing reports the error code to the controller by calling the "done" + * method on the controller. + * </p> + * @param countDownLock CountDown object to release when the processing has been done. + * @param jobProcessor Initialized Basic job processor instance. + * + * @return Returns the runnable instance. + */ + private Runnable getRunnableProcessor(final CountDown countDownLock,final BasicJobProcessor jobProcessor) + { + Runnable runnable=new Runnable(){ + public void run() + { + /** + * Call the process method on basic job processor and report the er. + */ + try + { + ErrorCode errorCode=jobProcessor.process(); + done(Thread.currentThread().getName(),errorCode); + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error(exception.getMessage(), exception); + done(Thread.currentThread().getName(), ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION); + } + /** + * Call the cleanup method on basic job processor. + */ + try + { + jobProcessor.cleanup(); + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error(exception.getMessage(), exception); + } + countDownLock.release(); + } + }; + return runnable; + } + + /** + * <p> + * Once all the processors have done their task, they will report here with the + * errorcode. This API will registers the errorcode with the thread name and + * wakes up the main thread if this is the last processor. + * </p> + * @param threadID Name of thread done and reporting. + * @param errorCode Processor return code. + */ + private void done(String threadID,ErrorCode errorCode) + { + this.jobProcessorsResultMap.put(threadID, errorCode); + logger.info(threadID + " has finished processing with the error code = " + errorCode); + } + + /** + * <p> + * Main thread will call this method and wait until all the processors have + * finished their processing. Once they are finished, all the return codes + * will be analyzed and one error code will be returned for this job. + * </p + * @param countDownLock CountDown lock being used by all the processors. + * + * @return Returns the error code of the job. + */ + private ErrorCode hybernate(CountDown countDownLock) + { + try + { + countDownLock.acquire(); + } + catch(InterruptedException exception) + { + exception.printStackTrace(); + logger.error(exception.getMessage(), exception); + } + + for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) + { + ErrorCode returnCode=(ErrorCode)iterator.next(); + if(returnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) + return returnCode; + } + return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + } } |