[Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/pool AbstractPoolJobLo
Brought to you by:
suresh_pragada
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv24956 Modified Files: AbstractPoolJobLoader.java AbstractPoolJobProcessor.java PoolJobController.java PoolJobLoader.java PoolJobProcessor.java Log Message: no message Index: PoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobProcessor.java,v retrieving revision 1.4 retrieving revision 1.5 diff -C2 -d -r1.4 -r1.5 *** PoolJobProcessor.java 16 May 2006 03:15:28 -0000 1.4 --- PoolJobProcessor.java 17 May 2006 22:04:36 -0000 1.5 *************** *** 6,11 **** /** * PoolJobProcessor gets the job data to be processed from the job pool and ! * processes it. * * @author Suresh pragada --- 6,14 ---- /** + * <p> * PoolJobProcessor gets the job data to be processed from the job pool and ! * processes it. Along with the processing methods, it exposes some method used ! * by management and monitoring clients. ! * </p> * * @author Suresh pragada *************** *** 16,25 **** { /** ! * Chance to initialize itself using the information configured for this job ! * processor in job configuration. * * @param configProps Configuration defined for the job processor in job configuration. */ ! public void initialize(Map configProps); /** * Suspends the pool job processor. --- 19,34 ---- { /** ! * <p> ! * Process the job data getting from the job pool and quits when job pool ! * out of the job data(returns null). ! * </p> * * @param configProps Configuration defined for the job processor in job configuration. + * @param pool Reference to Job Pool. + * + * @return Returns the error code. */ ! public ErrorCode processPool(Map configProps, JobPool pool); ! /** * Suspends the pool job processor. *************** *** 41,57 **** public boolean stop(); /** - * Chance to do any cleanup at the end of the processing. - */ - public void cleanup(); - /** - * Process the job data getting from the job pool and quits when job pool - * out of the job data(returns null). - * - * @param pool Reference to Job Pool. - * - * @return Returns the error code. - */ - public ErrorCode process(JobPool pool); - /** * Gets the processor to be displyed or anaylyzed for the monitoring purposes. * --- 50,53 ---- Index: AbstractPoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobLoader.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** AbstractPoolJobLoader.java 17 May 2006 03:05:58 -0000 1.6 --- AbstractPoolJobLoader.java 17 May 2006 22:04:36 -0000 1.7 *************** *** 11,15 **** --- 11,17 ---- package org.jmonks.batchserver.framework.controller.pool; + import EDU.oswego.cs.dl.util.concurrent.Mutex; import java.util.Map; + import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 31,35 **** * Holds the status of the loader. */ ! private ProcessorStatus loaderStatus=ProcessorStatus.INSTANTIATED; /** * Holds the pool reference passed by controller. --- 33,37 ---- * Holds the status of the loader. */ ! protected ProcessorStatus loaderStatus=ProcessorStatus.INSTANTIATED; /** * Holds the pool reference passed by controller. *************** *** 41,90 **** private Object currentJobData=null; /** ! * Returns the processing state of the loader. ! * ! * @return Returns the job data that this loader is loading. */ ! public Object getLoaderState() ! { ! return currentJobData.toString(); ! } ! /** ! * Returns the status of the loader. ! * ! * @return Returns the status of the loader. */ ! public ProcessorStatus getLoaderStatus() ! { ! return this.loaderStatus; ! } /** ! * Abstracts the loading of the job data into the pool by defining ! * another set of methods to load into the pool and implements the ! * management and monitoring related methods. * * @param configProps Properties defined for this loader in job configuration. * @param pool Job Pool reference. * ! * @return Returns the final error code of loading the jobs. */ public final ErrorCode loadPool(Map configProps,JobPool pool) { this.pool=pool; ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { returnCode=this.loadPool(configProps); } catch(Throwable exception) { exception.printStackTrace(); ! returnCode=ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } return returnCode; } /** ! * Loads the given job data into the job pool. * * @param jobData Job data object that needs to be processed. --- 43,94 ---- private Object currentJobData=null; /** ! * Signal the loader to the stop the loading. */ ! private boolean stopSignal=false; /** ! * Signal the loader to suspend the loading. */ ! private boolean suspendSignal=false; /** ! * Mutex lock to suspend and resume the loader. ! */ ! private Mutex suspendLock=new Mutex(); ! ! private static Logger logger=Logger.getLogger(AbstractPoolJobLoader.class); ! ! /** ! * Abstracts the job pool details from the final loader by defining ! * other set of methods for the final loader and implements the management ! * and monitoring related methods. * * @param configProps Properties defined for this loader in job configuration. * @param pool Job Pool reference. * ! * @return Returns the final status of loading. */ public final ErrorCode loadPool(Map configProps,JobPool pool) { + logger.trace("Entering loadPool"); this.pool=pool; ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { + this.loaderStatus=ProcessorStatus.RUNNING; returnCode=this.loadPool(configProps); + this.loaderStatus=ProcessorStatus.FINISHED; } catch(Throwable exception) { exception.printStackTrace(); ! logger.error("Exception while calling loaders loadPool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; } + logger.trace("Exiting loadPool"); return returnCode; } /** ! * Loads the given job data into the job pool.This method is for the ! * final loaders to load the job data into the pool. * * @param jobData Job data object that needs to be processed. *************** *** 95,122 **** { boolean loaded=false; if(jobData!=null) this.currentJobData=jobData; loaded=this.pool.loadJobData(jobData); return loaded; } ! ! public boolean resume() { return true; } ! public boolean stop() { return true; } ! public boolean suspend() { return true; } ! /** * */ public abstract ErrorCode loadPool(Map configProps); --- 99,237 ---- { boolean loaded=false; + if(jobData!=null) this.currentJobData=jobData; + else + logger.debug("Loading the null to signal the end of the pool for the processors(s)"); + loaded=this.pool.loadJobData(jobData); + + if(this.suspendSignal) + { + try + { + this.loaderStatus=ProcessorStatus.SUSPENDED; + this.suspendLock.acquire(); + this.loaderStatus=ProcessorStatus.RUNNING; + } + catch(InterruptedException exception) + { + exception.printStackTrace(); + logger.error("Exception while suspending the loader " + exception.getMessage(),exception); + this.loaderStatus=ProcessorStatus.RUNNING; + } + } + return loaded; } ! /** ! * Tells whether the loading of the jobs needs to be stopped or not. ! * Loader implementation should check for this flag before loading the jobs ! * into the pool. ! * ! * @return Returns true if loader to be stopped, false continue loading the jobs. ! */ ! protected final boolean stopLoading() ! { ! return this.stopSignal; ! } ! /** ! * Resumes the loading of the jobs. ! * ! * @return Returns true if loader is resumed, false otherwise. ! */ public boolean resume() { + logger.trace("Entering resume"); + this.stopSignal=false; + this.suspendSignal=false; + this.suspendLock.release(); + logger.trace("Exiting resume"); return true; } ! /** ! * Stops the loading of the jobs into the pool. ! * ! * @return Returns true if loader is stopped, false otherwise. ! */ public boolean stop() { + logger.trace("Entering stop"); + this.stopSignal=true; + + if(this.suspendSignal) + { + logger.info("Loader is in suspend status... resuming the loader"); + this.suspendSignal=false; + this.suspendLock.release(); + } + logger.trace("Exiting stop"); return true; } ! ! /** ! * Suspends loading of the jobs into the pool. ! */ public boolean suspend() { + logger.trace("Entering suspend"); + + this.suspendSignal=true; + + logger.trace("Exiting suspend"); return true; } ! ! /** ! * Returns the status of the loader. ! * ! * @return Returns the status of the loader. ! */ ! public ProcessorStatus getLoaderStatus() ! { ! return this.loaderStatus; ! } ! /** + * Returns the processing state of the loader. + * + * @return Returns the job data that this loader is loading. + */ + public Object getLoaderState() + { + return currentJobData.toString(); + } + + /** + * <p> + * Load the job data into the pool that needs to be processed by job processor(s). + * Implementers can take the help of the <i>loadJobData(Object)</i> method defined + * here to load the jobs into the pool. + * <br> + * <pre> + * public class MyLoader + * { + * public ErrorCode loadPool(Map configProps) + * { + * for(int i=0;i<100;i++) + * { + * loadJobData(new Integer(i)); + * if(super.stopLoading()) + * { + * doCleanup(); + * super.loaderStatus=ProcessorStatus.STOPPED; + * break; + * } + * } + * loadJobData(null); + * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + * } + * } + * </pre> + * </p> * + * @param configProps Configuration defined for this job loader in job configuration. + * + * @return Returns the final status of the loader. */ public abstract ErrorCode loadPool(Map configProps); Index: PoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobLoader.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** PoolJobLoader.java 17 May 2006 03:05:58 -0000 1.5 --- PoolJobLoader.java 17 May 2006 22:04:36 -0000 1.6 *************** *** 5,11 **** /** * PoolJobLoader loads the job data into the pool to be processed by ! * PoolJobProcessor(s). ! * * @author Suresh Pragada * @version 1.0 --- 5,15 ---- /** + * <p> * PoolJobLoader loads the job data into the pool to be processed by ! * PoolJobProcessor(s). Along with the methods to load job data into the ! * pool, it exposes the some other methods used by the management and ! * monitoring clients. ! * </p> ! * * @author Suresh Pragada * @version 1.0 *************** *** 16,24 **** /** * <p> ! * Loads the job data that needs to be processed in to the given job pool. * When finished loading of all the job data, loads the <i>null</i> into the pool * to singal the processor(s) that loading of all the jobs have been done. ! * Configuration defined for this loader in job configuration will be available ! * in configProps map. * <br> * Example loading the 100 integer objects into the pool. --- 20,28 ---- /** * <p> ! * Loads the job data that needs to be processed in to the job pool. * When finished loading of all the job data, loads the <i>null</i> into the pool * to singal the processor(s) that loading of all the jobs have been done. ! * Configuration defined for this loader in job configuration will be accessed by ! * configProps map received from controller. * <br> * Example loading the 100 integer objects into the pool. *************** *** 38,41 **** --- 42,47 ---- * @param configProps Properties defined for this job loader in job configuration. * @param pool Job Pool reference. + * + * @return Retrurns the final status of the loader. */ public ErrorCode loadPool(Map configProps,JobPool pool); Index: AbstractPoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobProcessor.java,v retrieving revision 1.3 retrieving revision 1.4 diff -C2 -d -r1.3 -r1.4 *** AbstractPoolJobProcessor.java 16 May 2006 03:15:28 -0000 1.3 --- AbstractPoolJobProcessor.java 17 May 2006 22:04:36 -0000 1.4 *************** *** 14,17 **** --- 14,18 ---- * to the final processor. * </p> + * * @author Suresh Pragada * @version 1.0 *************** *** 24,79 **** */ private int processedJobDataCount=0; - - private boolean stopSignal=false; - - private boolean suspendSignal=false; - - private Mutex suspendLock=new Mutex(); - - private static Logger logger=Logger.getLogger(AbstractPoolJobProcessor.class); - /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#initialize(java.util.Map) ! */ ! public abstract void initialize(Map configProps); ! /** ! * */ ! public abstract ErrorCode process(Object jobData); /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#cleanup() */ ! public abstract void cleanup(); ! /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#suspend() */ ! public boolean suspend() ! { ! this.suspendSignal=true; ! return this.suspendSignal; ! } /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#resume() */ ! public boolean resume() ! { ! this.stopSignal=false; ! this.suspendSignal=false; ! this.suspendLock.release(); ! return true; ! } /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#stop() */ ! public boolean stop() ! { ! return true; ! } /** ! * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#suspend() */ ! public ErrorCode processPool(JobPool pool) { Object jobData=null; while((jobData=pool.getNextJobData())!=null) --- 25,86 ---- */ private int processedJobDataCount=0; /** ! * Signal to stop the processor. */ ! private boolean stopSignal=false; /** ! * Singal to suspend the processor. */ ! private boolean suspendSignal=false; /** ! * Mutex lock to suspend the processor. */ ! private Mutex suspendLock=new Mutex(); /** ! * Holds the processor status. */ ! protected ProcessorStatus processorStatus=ProcessorStatus.INSTANTIATED; /** ! * Holds the job data currently being processed. */ ! private Object processingJobData=null; ! ! private static Logger logger=Logger.getLogger(AbstractPoolJobProcessor.class); ! /** ! * <p> ! * Initializes the processor implementation by calling the <i>initialize</i> ! * method by passing map contains the properties defined for the processor in ! * job configuration, gets the job data from the pool and passes that information ! * to the processor implementation for processing and cleansup the processor ! * implementation by calling the <i>cleanup</i>. ! * </p> ! * ! * @param configProps Properties defined for the processor in job configuration. ! * @param pool Job pool reference where job data needs to be pulled. ! * ! * @return Returns the status code of this processor. */ ! public ErrorCode processPool(Map configProps, JobPool pool) { + logger.trace("Entering processPool"); + ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; + /** + * Calling the initializer on final processor implementation. + */ + try + { + this.processorStatus=ProcessorStatus.INITIALIZING; + initialize(configProps); + this.processorStatus=ProcessorStatus.RUNNING; + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error("Exception while calling initialize on processor " + exception.getMessage(), exception); + } + /** + * Get the job data from pool and pass that on to the final processor implementation. + */ Object jobData=null; while((jobData=pool.getNextJobData())!=null) *************** *** 81,111 **** try { this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); } catch(Throwable exception) { exception.printStackTrace(); } if(this.stopSignal) { logger.info("Received the stop signal.. Preparing to stop"); break; ! } if(this.suspendSignal) { try { this.suspendLock.acquire(); } catch(InterruptedException exception) { exception.printStackTrace(); ! logger.error("Got exception while suspending..." + exception.getMessage(), exception); } ! } } return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } /** * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessorState() --- 88,189 ---- try { + this.processingJobData=jobData; this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); + returnCode=errorCode; } catch(Throwable exception) { exception.printStackTrace(); + logger.error("Exception while processing the job data " + exception.getMessage(),exception); + returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } + if(this.stopSignal) { logger.info("Received the stop signal.. Preparing to stop"); break; ! } ! if(this.suspendSignal) { try { + logger.info("Received suspend signal... suspending the process"); + this.processorStatus=ProcessorStatus.SUSPENDED; this.suspendLock.acquire(); + this.processorStatus=ProcessorStatus.RUNNING; + logger.info("Resuming the process"); } catch(InterruptedException exception) { exception.printStackTrace(); ! logger.error("Got exception while suspending..." + exception.getMessage() + " process is going to be resumed", exception); } ! } } + + try + { + this.processorStatus=ProcessorStatus.CLEANUP; + cleanup(); + this.processorStatus=ProcessorStatus.FINISHED; + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error("Exception while calling cleanup on processor " + exception.getMessage(), exception); + } + + logger.trace("Exiting processPool"); return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } + + /** + * Suspends the processor. + * + * @return Returns true if the processor is suspended, false otherwise. + */ + public boolean suspend() + { + logger.trace("Entering suspend"); + this.suspendSignal=true; + logger.trace("Entering suspend"); + return true; + } + /** + * Resumes the processor. + * + * @return Retrurns true if the processor is resumed, false otherwise. + */ + public boolean resume() + { + logger.trace("Entering resume"); + this.stopSignal=false; + this.suspendSignal=false; + this.suspendLock.release(); + logger.trace("Exiting resume"); + return true; + } + /** + * Stops the processor. + * + * @return Retrurns true if the processor has stopped, false otherwise. + */ + public boolean stop() + { + logger.trace("Entering stop"); + + this.stopSignal=true; + if(this.suspendSignal) + { + logger.info("Processor is in suspend status... resuming the processor"); + this.suspendSignal=false; + this.suspendLock.release(); + } + + logger.trace("Exiting stop"); + return true; + } /** * @see org.jmonks.batchserver.framework.controller.pool.PoolJobProcessor#getProcessorState() *************** *** 113,117 **** public Object getProcessorState() { ! return null; } /** --- 191,195 ---- public Object getProcessorState() { ! return this.processingJobData; } /** *************** *** 120,124 **** public ProcessorStatus getProcessorStatus() { ! return null; } /** --- 198,202 ---- public ProcessorStatus getProcessorStatus() { ! return this.processorStatus; } /** *************** *** 129,131 **** --- 207,227 ---- return processedJobDataCount; } + + /** + * Chance to initialize itself using the information configured for this job + * processor in job configuration. + * + * @param configProps Configuration defined for the job processor in job configuration. + */ + public abstract void initialize(Map configProps); + + /** + * + */ + public abstract ErrorCode process(Object jobData); + + /** + * Chance to do any cleanup at the end of the processing. + */ + public abstract void cleanup(); } Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** PoolJobController.java 14 May 2006 01:31:32 -0000 1.7 --- PoolJobController.java 17 May 2006 22:04:36 -0000 1.8 *************** *** 1,4 **** --- 1,13 ---- package org.jmonks.batchserver.framework.controller.pool; + import EDU.oswego.cs.dl.util.concurrent.Callable; + import EDU.oswego.cs.dl.util.concurrent.CountDown; + import EDU.oswego.cs.dl.util.concurrent.FutureResult; + import java.util.Hashtable; + import java.util.Iterator; + import java.util.Map; + import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.config.ConfigurationException; + import org.jmonks.batchserver.framework.config.PoolJobControllerConfig; import org.jmonks.batchserver.framework.controller.JobController; import org.jmonks.batchserver.framework.management.ProcessorState; *************** *** 56,59 **** --- 65,81 ---- { /** + * Map holds all the pool job processors and loader being used for the given job + * as values and name of the threads as keys. + */ + private Map jobProcessorsMap=new Hashtable(); + /** + * Map holds all the pool job processors and loader returned error codes as values and + * name of the threads as keys. + */ + private Map jobProcessorsResultMap=new Hashtable(); + + private static Logger logger=Logger.getLogger(PoolJobController.class); + + /** * Constructor enables the instantiation of the pool job controller instance. */ *************** *** 72,77 **** public ErrorCode process() { ! return null; } /** --- 94,143 ---- public ErrorCode process() { ! logger.info("Entering process in pool job controller = " + super.getJobName()); ! PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)super.getJobControllerConfig(); ! validateControllerConfiguration(poolJobControllerConfig); ! int processorCount=poolJobControllerConfig.getPoolJobProcessorThreadCount(); ! processorCount=(processorCount<1?1:processorCount); ! /** ! * Create and initialize the pool ! */ ! JobPool pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); ! pool.initialize(poolJobControllerConfig.getPoolConfigProperties()); ! ! CountDown countDownLock=new CountDown(processorCount+1); ! /** ! * Create, initialize and spwan the loader ! */ ! PoolJobLoader jobLoader=(PoolJobLoader)this.getInstance(poolJobControllerConfig.getPoolJobLoaderClassName()); ! String jobLoaderName=super.getJobName()+"_Loader"; ! FutureResult jobLoaderFutureResult=new FutureResult(); ! Thread jobLoaderThread=new Thread(jobLoaderFutureResult.setter(this.getCallableLoader( ! countDownLock,jobLoader,poolJobControllerConfig.getPoolJobLoaderConfigProperties(),pool)),jobLoaderName); ! jobLoaderThread.start(); ! this.jobProcessorsResultMap.put(jobLoaderName, jobLoaderFutureResult); ! this.jobProcessorsMap.put(jobLoaderName, jobLoader); ! /** ! * Create, initialize and spawn the processor(s). ! */ ! for(int i=0;i<processorCount;i++) ! { ! PoolJobProcessor jobProcessor=(PoolJobProcessor)this.getInstance(poolJobControllerConfig.getPoolJobProcessorClassName()); ! String jobProcessorName=super.getJobName()+"_Processor_"+i; ! FutureResult jobProcessorFutureResult=new FutureResult(); ! Thread jobProcessorThread=new Thread(jobProcessorFutureResult.setter(this.getCallableProcessor( ! countDownLock,jobProcessor,poolJobControllerConfig.getPoolJobProcessorConfigProperties(),pool)),jobProcessorName); ! jobProcessorThread.start(); ! this.jobProcessorsResultMap.put(jobProcessorName, jobProcessorFutureResult); ! this.jobProcessorsMap.put(jobProcessorName, jobProcessor); ! } ! /** ! * Go to hydernate until all threads have finsihed their task. ! */ ! ErrorCode returnCode=hybernate(countDownLock); ! logger.info("Exiting process in pool job controller = " + super.getJobName() + " with return code = " + returnCode); ! return returnCode; } + + /** *************** *** 83,87 **** public int getExpectedRecordsCount() { ! return 0; } --- 149,161 ---- public int getExpectedRecordsCount() { ! logger.trace("Entering getExpectedRecordsCount"); ! ! int expectedRecordsCount=0; ! String jobLoaderName=super.getJobName()+"_Loader"; ! PoolJobLoader jobLoader=(PoolJobLoader)this.jobProcessorsMap.get(jobLoaderName); ! expectedRecordsCount=jobLoader.getTotalJobDataCount(); ! ! logger.trace("Exiting getExpectedRecordsCount = " + expectedRecordsCount); ! return expectedRecordsCount; } *************** *** 94,98 **** public int getProcessedRecordsCount() { ! return 0; } --- 168,183 ---- public int getProcessedRecordsCount() { ! logger.trace("Entering getProcessedRecordsCount"); ! ! int processedRecordsCount=0; ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! Object processor=iterator.next(); ! if(processor instanceof PoolJobProcessor) ! processedRecordsCount=processedRecordsCount+((PoolJobProcessor)processor).getProcessedJobDataCount(); ! } ! ! logger.trace("Exiting getProcessedRecordsCount = " + processedRecordsCount); ! return processedRecordsCount; } *************** *** 104,108 **** public java.lang.String[] getProcessorIDList() { ! return null; } --- 189,201 ---- public java.lang.String[] getProcessorIDList() { ! logger.trace("Entering getProcessorIDList"); ! ! String processorIDList[]=new String[this.jobProcessorsMap.size()]; ! int i=0; ! for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) ! processorIDList[i]=(String)iterator.next(); ! ! logger.trace("Exiting getProcessorIDList"); ! return processorIDList; } *************** *** 117,121 **** public ProcessorState getProcessorState(String processorID) { ! return null; } --- 210,233 ---- public ProcessorState getProcessorState(String processorID) { ! logger.trace("Exiting getProcessorState = " + processorID); ! ProcessorState state=null; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! state=new ProcessorState(processorID, "Pool Job Loader", ((PoolJobLoader)processor).getLoaderState()); ! else if(processor instanceof PoolJobProcessor) ! state=new ProcessorState(processorID, "Pool Job Processor", ((PoolJobProcessor)processor).getProcessorState()); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! state=null; ! } ! } ! else ! state=null; ! ! logger.trace("Exiting getProcessorState = " + processorID + " state = " + state); ! return state; } *************** *** 127,131 **** public ProcessorStatus getProcessorStatus(String processorID) { ! return null; } --- 239,262 ---- public ProcessorStatus getProcessorStatus(String processorID) { ! logger.trace("Exiting getProcessorStatus = " + processorID); ! ProcessorStatus status=null; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! status=((PoolJobLoader)processor).getLoaderStatus(); ! else if(processor instanceof PoolJobProcessor) ! status=((PoolJobProcessor)processor).getProcessorStatus(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! status=null; ! } ! } ! else ! status=null; ! ! logger.trace("Exiting getProcessorStatus = " + processorID + " status = " + status); ! return status; } *************** *** 137,141 **** public boolean stop(String processorID) { ! return true; } --- 268,290 ---- public boolean stop(String processorID) { ! logger.trace("Entering stop = " + processorID); ! boolean stopped=true; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! stopped=((PoolJobLoader)processor).stop(); ! else if(processor instanceof PoolJobProcessor) ! stopped=((PoolJobProcessor)processor).stop(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! stopped=false; ! } ! } ! else ! stopped=false; ! logger.trace("Exiting stop = " + processorID + " status = " + stopped); ! return stopped; } *************** *** 147,151 **** public boolean suspend(String processorID) { ! return true; } --- 296,318 ---- public boolean suspend(String processorID) { ! logger.trace("Entering suspend = " + processorID); ! boolean suspended=true; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! suspended=((PoolJobLoader)processor).suspend(); ! else if(processor instanceof PoolJobProcessor) ! suspended=((PoolJobProcessor)processor).suspend(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! suspended=false; ! } ! } ! else ! suspended=false; ! logger.trace("Exiting suspend = " + processorID + " status = " + suspended); ! return suspended; } *************** *** 157,161 **** public boolean resume(String processorID) { ! return true; } } --- 324,534 ---- public boolean resume(String processorID) { ! logger.trace("Entering resume = " + processorID); ! boolean resumed=true; ! Object processor=this.jobProcessorsMap.get(processorID); ! if(processor!=null) ! { ! if(processor instanceof PoolJobLoader) ! resumed=((PoolJobLoader)processor).resume(); ! else if(processor instanceof PoolJobProcessor) ! resumed=((PoolJobProcessor)processor).resume(); ! else ! { ! logger.error("What else it could be? " + processor.getClass().getName()); ! resumed=false; ! } ! } ! else ! resumed=false; ! logger.trace("Exiting resume = " + processorID + " status = " + resumed); ! return resumed; ! } ! ! /** ! * <p> ! * Main thread will call this method and wait until all the processor(s) and loader have ! * finished their processing. Once they are finished, all the return codes ! * will be analyzed and one error code will be returned for this job. ! * </p ! * @param countDownLock CountDown lock being used by loader and all the processor(s). ! * ! * @return Returns the error code of the job. ! */ ! private ErrorCode hybernate(CountDown countDownLock) ! { ! logger.trace("Entering hybernate"); ! ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; ! try ! { ! logger.debug("Going to wait until loader and all the processor(s) is gonna finish."); ! countDownLock.acquire(); ! logger.info("Loader and all processor(s) have finished their task."); ! } ! catch(InterruptedException exception) ! { ! exception.printStackTrace(); ! logger.error("Exception while waiting for loader and all the processor(s) = " + exception.getMessage(), exception); ! return ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; ! } ! ! for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) ! { ! ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(threadReturnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) ! { ! returnCode=threadReturnCode; ! break; ! } ! } ! logger.trace("Exiting hybernate = " + returnCode); ! return returnCode; ! } ! ! /** ! * Wraps the job loader with Callable interface and return the callable ! * interface. ! * ! * @param countDownLock Count down to be released at the end of the processing. ! * @param jobLoader Job Loader that needs to be invoked. ! * @param configProps Properties defined for this loader in job configuration. ! * @param pool Job pool where the loader needs to load the jobs. ! * ! * @return Returns the callabel interface wrapped around the job loader. ! */ ! private Callable getCallableLoader(final CountDown countDownLock,final PoolJobLoader jobLoader, final Map configProps, final JobPool pool) ! { ! logger.trace("Entering getCallableLoader"); ! Callable callable=new Callable(){ ! public Object call() ! { ! ErrorCode returnCode=null; ! try ! { ! logger.trace("Going to call the loadPool method"); ! returnCode=jobLoader.loadPool(configProps,pool); ! logger.debug("Done calling the loadPool method"); ! } ! catch(Throwable exception) ! { ! exception.printStackTrace(); ! logger.error("Exception while loading the job data into the pool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; ! } ! countDownLock.release(); ! logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); ! return returnCode; ! } ! }; ! logger.trace("Exiting getCallableLoader"); ! return callable; } + + /** + * Wraps the job processor with Callable interface and return the callable + * interface. + * + * @param countDownLock Count down to be released at the end of the processing. + * @param jobProcessor Job processor that needs to be invoked. + * @param configProps Properties defined for this loader in job configuration. + * @param pool Job pool where the processor needs to pull the jobs. + * + * @return Returns the callabel interface wrapped around the job processor. + */ + private Callable getCallableProcessor(final CountDown countDownLock,final PoolJobProcessor jobProcessor, final Map configProps, final JobPool pool) + { + logger.trace("Entering getCallableProcessor"); + Callable callable=new Callable(){ + public Object call() + { + ErrorCode returnCode=null; + try + { + logger.trace("Going to call the processPool method"); + returnCode=jobProcessor.processPool(configProps,pool); + logger.debug("Done calling the processPool method"); + } + catch(Throwable exception) + { + exception.printStackTrace(); + logger.error("Exception while processing the job data from the pool = " + exception.getMessage(), exception); + returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; + } + countDownLock.release(); + logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); + return returnCode; + } + }; + logger.trace("Exiting getCallableProcessor"); + return callable; + } + + /** + * Instantiates and returns the instance of the required class. + * + * @param className Class name of the required instance. + * + * @return Retrurns the instance of the required class. + * + * @throws ConfigurationException If class could not be instantiated. + */ + private Object getInstance(String className) + { + try + { + return Class.forName(className).newInstance(); + } + catch(Exception exception) + { + exception.printStackTrace(); + logger.fatal(exception.getMessage(),exception); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); + } + } + + /** + * Validates the pool job controller configuration by instantiating and verifying + * all the required loader, processor and pool classes. + * + * @param controllerConfig Controller configuration defined in job configuration. + * + * @throws ConfigurationException If any one required instances cannot be instantiated. + */ + private void validateControllerConfiguration(PoolJobControllerConfig controllerConfig) + { + Object jobPool=this.getInstance(controllerConfig.getPoolClassName()); + if(jobPool instanceof JobPool) + { + logger.debug("Job pool is configured properly = " + controllerConfig.getPoolClassName()); + } + else + { + logger.fatal("Configured job pool class name " + controllerConfig.getPoolClassName() + " cannot be associated to JobPool"); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, + "Configured job pool class name " + controllerConfig.getPoolClassName() + " cannot be associated to JobPool"); + } + + Object jobLoader=this.getInstance(controllerConfig.getPoolJobLoaderClassName()); + if(jobLoader instanceof PoolJobLoader) + { + logger.debug("Job loader is configured properly = " + controllerConfig.getPoolJobLoaderClassName()); + } + else + { + logger.fatal("Configured job loader class name " + controllerConfig.getPoolJobLoaderClassName() + " cannot be associated to PoolJobLoader"); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, + "Configured job loader class name " + controllerConfig.getPoolJobLoaderClassName() + " cannot be associated to PoolJobLoader"); + } + + Object jobProcessor=this.getInstance(controllerConfig.getPoolJobProcessorClassName()); + if(jobProcessor instanceof PoolJobLoader) + { + logger.debug("Job Processor is configured properly = " + controllerConfig.getPoolJobProcessorClassName()); + } + else + { + logger.fatal("Configured job processor class name " + controllerConfig.getPoolJobProcessorClassName() + " cannot be associated to PoolJobProcessor"); + throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, + "Configured job processor class name " + controllerConfig.getPoolJobProcessorClassName() + " cannot be associated to PoolJobProcessor"); + } + } } |