[Batchserver-cvs] batchserver/src/org/jmonks/batch/framework/controller/pool AbstractPoolJobLoader
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-09-15 20:06:52
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batch/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv19925 Added Files: AbstractPoolJobLoader.java AbstractPoolJobProcessor.java CollectionJobPool.java JobPool.java PoolJobController.java PoolJobLoader.java PoolJobProcessor.java Log Message: no message --- NEW FILE: PoolJobLoader.java --- package org.jmonks.batch.framework.controller.pool; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.management.ProcessorStatus; /** * <p> * PoolJobLoader loads the job data into the pool to be processed by * PoolJobProcessor(s). Along with the methods to load job data into the * pool, it exposes the some other methods used by the management and * monitoring clients. * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public interface PoolJobLoader { /** * <p> * Loads the job data that needs to be processed in to the job pool. * When finished loading of all the job data, loads the <i>null</i> into the pool * to singal the processor(s) that loading of all the jobs have been done. * Configuration defined for this loader in job configuration can be accessed * using job context reference. * <br> * Example loading the 100 integer objects into the pool. * <pre> * public class MyPoolJobLoader implements PoolJobLoader * { * public ErrorCode loadPool(JobContext jobContext, JobPool pool) * { * for(int i=0;i<100;i++) * pool.loadJobData(new Integer(i)); * pool.loadJobData(null); * * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY * } * } * <pre> * </p> * * @param jobContext Context of the job being run. * @param pool Job Pool reference. * * @return Retrurns the final status of the loader. */ public ErrorCode loadPool(JobContext jobContext,JobPool pool); /** * Suspends the loader. * * @return Returns true if loader is suspended, false otherwise. */ public boolean suspend(); /** * Resumes the loader. * * @return Returns true if loader is resumed, false otherwise. */ public boolean resume(); /** * Stops the loader. * * @return Returns true if loader is stopped, false otherwise. */ public boolean stop(); /** * Gets the total records this loader is going to load. * * @return Returns the number of records this loader is going to load. */ public long getTotalJobDataCount(); /** * Gets the loader state as object which can be understan by the monitoring * client. Usually, this would be used for display purposes. * * @return Returns the displayable object representing the loader state. */ public Object getLoaderState(); /** * Gets the loader status. * * @return Returns the loader status. */ public ProcessorStatus getLoaderStatus(); } --- NEW FILE: JobPool.java --- package org.jmonks.batch.framework.controller.pool; import org.jmonks.batch.framework.JobContext; /** * <p> * JobPool pools all the job data being loaded by the loader and serves this data * when job processors requests for the processing. This interface enables the loader, * processor and controller to interact with the pool implementation. * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public interface JobPool { /** * Controller calls this method to initialize the job pool. This will be called * before pool reference is being passed to the job loader and job processor. * All the configuration defined for the job pool can be retrieved using * job context reference. * * @param jobContext Context of the job being run. */ public void initialize(JobContext jobContext); /** * Gets the next available job data(piece of information) to be processed. * If it is not available, it will be waited until it gets the next job data. * If loader is done with loading the jobs, it returns null. At this time, * processor should quit its processing. * * @return Returns the next available job data to be process, * null if no job data is available. */ public Object getNextJobData(); /** * Job Loader will use this API to load the job data into the pool. * If job loader is done with the loading all the data to be processed, * it should load null indicating that loading of all the job data has been * done. * * @param jobData Job data needs to be processed. */ public boolean loadJobData(Object jobData); /** * Controller calls this method after job processing has been done. */ public void cleanup(); /** * Returns the number of job data objects being loaded into the pool. * * @return Returns the number of job data objects being loaded into the pool. */ public long getLoadedJobDataCount(); } --- NEW FILE: PoolJobProcessor.java --- package org.jmonks.batch.framework.controller.pool; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.management.ProcessorStatus; /** * <p> * PoolJobProcessor gets the job data to be processed from the job pool and * processes it. Along with the processing methods, it exposes some methods used * by management and monitoring clients. * </p> * * @author Suresh pragada * @version 1.0 * @since 1.0 */ public interface PoolJobProcessor { /** * <p> * Process the job data available in the job pool until the job loader done loading * of all the job data. JobContext reference provides the access to * many different resources in the framework. Following is an examples shows * the sample implementation of processPool method. * <br><br> * <pre> * public class MyPoolJobProcessor implements PoolJobProcessor * { * public ErrorCode processPool(JobContext jobContext, JobPool pool) * { * Object jobData=null; * while((jobData=pool.getNextJobData())!=null) * { * // Perform the business logic on jobData * } * * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; * } * } * </pre> * </p> * * @param jobContext Context the job is being run. * @param pool Reference to Job Pool. * * @return Returns the error code. */ public ErrorCode processPool(JobContext jobContext, JobPool pool); /** * Suspends the pool job processor. * * @return Returns true if processor is suspended, false otherwise. */ public boolean suspend(); /** * Resumes the pool job processor. * * @return Returns true if processor is resumed, false otherwise. */ public boolean resume(); /** * Stops the processor. * * @return Returns true if processor is stopped, false otherwise. */ public boolean stop(); /** * Gets the processor to be displyed or anaylyzed for the monitoring purposes. * * @return Returns an object understanble/displayable by monitoring client. */ public Object getProcessorState(); /** * Gets the processor status being used by the management clients. * * @return Returns the status of the job. */ public ProcessorStatus getProcessorStatus(); /** * Returns the number of job data objects this particular job processor * has finsihed. * * @return Returns the number of job data objects processed. */ public long getProcessedJobDataCount(); } --- NEW FILE: AbstractPoolJobProcessor.java --- package org.jmonks.batch.framework.controller.pool; import EDU.oswego.cs.dl.util.concurrent.Mutex; import org.apache.log4j.Logger; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.management.ProcessorStatus; /** * <p> * Abstract pool job processor implements some of the responsiblites defined by the * pool job processor and leaves the application specific functionality implementation * to the final processor. Following is an example class shows how to make use of the * AbstractPoolJobProcessor. * <br><br> * <pre> * public class MyPoolJobProcessor extends AbstractPoolJobProcessor * { * private Connection connection=null; * * public void initialize(JobContext jobContext) * { * // Perform the initialization for this instance of job processor. * // Good place to get any references to any resources to be used to * // to processing of all the job data. * connection=ConnectionManager.getDBConnection(); * } * * public ErrorCode process(Object jobData) * { * // Perform the business logic on the incoming jobData. * connection.performBusinessLogic(jobData); * * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; * } * * public void cleanup() * { * // Do some cleanup after all the jobData has been processed. * connection.close(); * } * } * </pre> * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public abstract class AbstractPoolJobProcessor implements PoolJobProcessor { /** * Counts the number of job data objects processed. */ private long processedJobDataCount=0; /** * Signal to stop the processor. */ private boolean stopSignal=false; /** * Singal to suspend the processor. */ private boolean suspendSignal=false; /** * Mutex lock to suspend the processor. */ private Mutex suspendLock=new Mutex(); /** * Holds the processor status. */ protected ProcessorStatus processorStatus=ProcessorStatus.INSTANTIATED; /** * Holds the job data currently being processed. */ private Object processingJobData=null; private static Logger logger=Logger.getLogger(AbstractPoolJobProcessor.class); /** * <p> * Initializes the processor implementation by calling the <i>initialize</i> * method by passing job context reference, gets the job data from the pool * and passes that information to the processor implementation for * processing and cleans up the processor implementation by calling the <i>cleanup</i> * method. * </p> * * @param jobContext Context the job is being run. * @param pool Job pool reference where job data needs to be pulled. * * @return Returns the status code of this processor. */ public ErrorCode processPool(JobContext jobContext, JobPool pool) { logger.trace("Entering processPool"); ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; /** * Calling the initializer on final processor implementation. */ try { this.processorStatus=ProcessorStatus.INITIALIZING; initialize(jobContext); this.processorStatus=ProcessorStatus.RUNNING; } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while calling initialize on processor " + exception.getMessage(), exception); } /** * Get the job data from pool and pass that on to the final processor implementation. */ Object jobData=null; while((jobData=pool.getNextJobData())!=null) { try { this.processingJobData=jobData; this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); returnCode=errorCode; } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while processing the job data " + exception.getMessage(),exception); returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } if(this.stopSignal) { logger.info("Received the stop signal.. Preparing to stop"); break; } if(this.suspendSignal) { try { logger.info("Received suspend signal... suspending the process"); this.processorStatus=ProcessorStatus.SUSPENDED; this.suspendLock.acquire(); this.processorStatus=ProcessorStatus.RUNNING; logger.info("Resuming the process"); } catch(InterruptedException exception) { exception.printStackTrace(); logger.error("Got exception while suspending..." + exception.getMessage() + " process is going to be resumed", exception); } } } try { this.processorStatus=ProcessorStatus.CLEANUP; cleanup(); this.processorStatus=ProcessorStatus.FINISHED; } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while calling cleanup on processor " + exception.getMessage(), exception); } logger.trace("Exiting processPool"); return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } /** * Suspends the processor. * * @return Returns true if the processor is suspended, false otherwise. */ public boolean suspend() { logger.trace("Entering suspend"); this.suspendSignal=true; logger.trace("Entering suspend"); return true; } /** * Resumes the processor. * * @return Retrurns true if the processor is resumed, false otherwise. */ public boolean resume() { logger.trace("Entering resume"); this.stopSignal=false; this.suspendSignal=false; this.suspendLock.release(); logger.trace("Exiting resume"); return true; } /** * Stops the processor. * * @return Retrurns true if the processor has stopped, false otherwise. */ public boolean stop() { logger.trace("Entering stop"); this.stopSignal=true; if(this.suspendSignal) { logger.info("Processor is in suspend status... resuming the processor"); this.suspendSignal=false; this.suspendLock.release(); } logger.trace("Exiting stop"); return true; } /** * @see org.jmonks.batch.framework.controller.pool.PoolJobProcessor#getProcessorState() */ public Object getProcessorState() { return this.processingJobData; } /** * @see org.jmonks.batch.framework.controller.pool.PoolJobProcessor#getProcessorStatus() */ public ProcessorStatus getProcessorStatus() { return this.processorStatus; } /** * @see org.jmonks.batch.framework.controller.pool.PoolJobProcessor#getProcessedJobDataCount() */ public long getProcessedJobDataCount() { return processedJobDataCount; } /** * Chance to initialize itself using the information provided through * job context. This will be called only once for each processor and before * it start processing job data using process method. * * @param jobContext Context the job is being run. */ public abstract void initialize(JobContext jobContext); /** * Execute the business logic on the given jobData and return the * appropriate error code. The format and type of jobData is depends * on the job loader configured for the same job. Usually, there will be * an understanding between the loader and processor on the type of jobData * being loaded into the pool. * * @param jobData Data to be processed. * * @return Returns the status of the processingn of this jobData. */ public abstract ErrorCode process(Object jobData); /** * Chance to do any cleanup at the end of the processing. Called once per * each processor at the end of processing of all the jobs. */ public abstract void cleanup(); } --- NEW FILE: AbstractPoolJobLoader.java --- /* * AbstractPoolJobLoader.java * * Created on May 16, 2006, 4:49 PM * * To change this template, choose Tools | Options and locate the template under * the Source Creation and Management node. Right-click the template and choose * Open. You can then make changes to the template in the Source Editor. */ package org.jmonks.batch.framework.controller.pool; import EDU.oswego.cs.dl.util.concurrent.Mutex; import org.apache.log4j.Logger; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.management.ProcessorStatus; /** * <p> * AbstractPoolJobLoader implements all the management and monitoring methods * and abstracts the user from the job pool. This allows the loader implementation * to concentrate on the business logic. * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public abstract class AbstractPoolJobLoader implements PoolJobLoader { /** * Holds the status of the loader. */ protected ProcessorStatus loaderStatus=ProcessorStatus.INSTANTIATED; /** * Holds the pool reference passed by controller. */ private JobPool pool=null; /** * Holds the current job data that will be used for the monitoring purposes. */ private Object currentJobData=null; /** * Signal the loader to the stop the loading. */ private boolean stopSignal=false; /** * Signal the loader to suspend the loading. */ private boolean suspendSignal=false; /** * Mutex lock to suspend and resume the loader. */ private Mutex suspendLock=new Mutex(); private static Logger logger=Logger.getLogger(AbstractPoolJobLoader.class); /** * Abstracts the job pool details from the final loader by defining * other set of methods for the final loader and implements the management * and monitoring related methods. * * @param jobContext Context of the job being run. * @param pool Job Pool reference. * * @return Returns the final status of loading. */ public final ErrorCode loadPool(JobContext jobContext,JobPool pool) { logger.trace("Entering loadPool"); this.pool=pool; ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { this.loaderStatus=ProcessorStatus.RUNNING; returnCode=this.loadPool(jobContext); this.loaderStatus=ProcessorStatus.FINISHED; } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while calling loaders loadPool = " + exception.getMessage(), exception); returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; } logger.trace("Exiting loadPool"); return returnCode; } /** * Loads the given job data into the job pool.This method is for the * final loaders to load the job data into the pool. * * @param jobData Job data object that needs to be processed. * * @return Returns true if the job data is loaded into the pool, false otherwise. */ protected final boolean loadJobData(Object jobData) { boolean loaded=false; if(jobData!=null) this.currentJobData=jobData; else logger.debug("Loading the null to signal the end of the pool for the processors(s)"); loaded=this.pool.loadJobData(jobData); if(this.suspendSignal) { try { this.loaderStatus=ProcessorStatus.SUSPENDED; this.suspendLock.acquire(); this.loaderStatus=ProcessorStatus.RUNNING; } catch(InterruptedException exception) { exception.printStackTrace(); logger.error("Exception while suspending the loader " + exception.getMessage(),exception); this.loaderStatus=ProcessorStatus.RUNNING; } } return loaded; } /** * Tells whether the loading of the jobs needs to be stopped or not. * Loader implementation should check for this flag before loading the jobs * into the pool. * * @return Returns true if loader to be stopped, false continue loading the jobs. */ protected final boolean stopLoading() { return this.stopSignal; } /** * Resumes the loading of the jobs. * * @return Returns true if loader is resumed, false otherwise. */ public boolean resume() { logger.trace("Entering resume"); this.stopSignal=false; this.suspendSignal=false; this.suspendLock.release(); logger.trace("Exiting resume"); return true; } /** * Stops the loading of the jobs into the pool. * * @return Returns true if loader is stopped, false otherwise. */ public boolean stop() { logger.trace("Entering stop"); this.stopSignal=true; if(this.suspendSignal) { logger.info("Loader is in suspend status... resuming the loader"); this.suspendSignal=false; this.suspendLock.release(); } logger.trace("Exiting stop"); return true; } /** * Suspends loading of the jobs into the pool. */ public boolean suspend() { logger.trace("Entering suspend"); this.suspendSignal=true; logger.trace("Exiting suspend"); return true; } /** * Returns the status of the loader. * * @return Returns the status of the loader. */ public ProcessorStatus getLoaderStatus() { return this.loaderStatus; } /** * Returns the processing state of the loader. * * @return Returns the job data that this loader is loading. */ public Object getLoaderState() { return currentJobData.toString(); } /** * <p> * Load the job data into the pool that needs to be processed by job processor(s). * Implementers can take the help of the <i>loadJobData(Object)</i> method defined * here to load the jobs into the pool. * <br> * <pre> * public class MyPoolJobLoader extends AbstractPoolJobLoader * { * public ErrorCode loadPool(JobContext jobContext) * { * for(int i=0;i<100;i++) * { * loadJobData(new Integer(i)); * if(super.stopLoading()) * { * doCleanup(); * super.loaderStatus=ProcessorStatus.STOPPED; * break; * } * } * loadJobData(null); * return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; * } * } * </pre> * </p> * * @param jobContext Context of the job being run. * * @return Returns the final status of the loader. */ public abstract ErrorCode loadPool(JobContext jobContext); /** * Returns the number of job data objects that this loader is going to load. * * @return Return the number of job data object its going to load. */ public abstract long getTotalJobDataCount(); } --- NEW FILE: PoolJobController.java --- package org.jmonks.batch.framework.controller.pool; import EDU.oswego.cs.dl.util.concurrent.Callable; import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; import java.util.Calendar; import java.util.Hashtable; import java.util.Iterator; import java.util.Map; import org.apache.log4j.Logger; import org.jmonks.batch.framework.JobStatistics; import org.jmonks.batch.framework.ErrorCode; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.config.ConfigurationException; import org.jmonks.batch.framework.config.PoolJobControllerConfig; import org.jmonks.batch.framework.controller.JobController; import org.jmonks.batch.framework.management.ProcessorState; import org.jmonks.batch.framework.management.ProcessorStatus; /** * <p> * PoolJobController provides the job architecture based on the pool concept, * where a loader is load all the information to be processed into the pool and * processors(>=1) retrieves the information from pool and process them. * It provides the abstract loader and processor classes to be overriden by job * implementation and comes with different implementation of pools. * <br><br> * To write any job using this controller, developer needs to write a loader class * extends AbstractPoolJobLoader, which loads the data needs to be processed into * the pool and write a processor class extending the AbstractPoolJobProcessor, * which process the data from the pool. Once implementation is done, * this job should be configured either using the XML configuration or DB configuration. * * <br><br> * <i>XML Configuration is as follows</i> <br><br> * <pre> * <job-config job-name="process_file_abc"> * <job-controller controller-class-name="org.jmonks.batch.framework.controller.pool.PoolJobController"> * <pool-job-loader pool-job-loader-class-name="com.mycompany.batch.processfileabc.AbcJobLoader"> * <property key="pool-job-loader-config1">pool-job-loader-value1</property> * </pool-job-loader> * <pool-job-processor pool-job-processor-class-name="com.mycompany.batch.processfileabc.AbcJobProcessor" thread-count="5"> * <property key="pool-job-processor-config1">pool-job-processor-value1</property> * </pool-job-processor> * <job-pool job-pool-class-name="org.jmonks.batch.framework.controller.pool.DefaultJobPool"> * <property key="pool-size">50000</property> * </job-pool> * <property key="pool-controller-config1">pool-controller-value1</property> * </job-controller> * <job-logging-config> * <logging-property-file>com.mycompany.batch.processfileabc.Logging</logging-property-file> * </job-logging-config> * </job-config> * </pre> * <br><br> * <i>DB Configuration is as follows</i> * <table border="1"> * <tr><td><b>TableName.ColumnName</b></td><td><b>Value</b></td></tr> * <tr><td>job_config.job_name</td><td>process_file_abc</td></tr> * <tr><td>job_config.job_status</td><td>1</td></tr> * <tr><td>job_config.job_controller_class_name</td><td>org.jmonks.batch.framework.controller.pool.PoolJobController</td></tr> * <tr><td>job_config.job_controller_props</td><td>pool-controller-config1=pool-controller-value1:pool-controller-config1=poo2-controller-value2</td></tr> * <tr><td>pool_job_controller_config.job_name</td><td>process_file_abc</td></tr> * <tr><td>pool_job_controller_config.pool_job_loader_class_name</td><td>com.mycompany.batch.processfileabc.AbcJobLoader</td></tr> * <tr><td>pool_job_controller_config.pool_job_loader_props</td><td>pool-job-loader-key1=loader-value1</td></tr> * <tr><td>pool_job_controller_config.pool_job_processor_class_name</td><td>com.mycompany.batch.processfileabc.AbcJobProcessor</td></tr> * <tr><td>pool_job_controller_config.pool_job_processor_props</td><td>pool-job-processor-key1=processor-value1</td></tr> * <tr><td>pool_job_controller_config.pool_job_processor_thread_cnt</td><td>5</td></tr> * <tr><td>pool_job_controller_config.job_pool_class_name</td><td>org.jmonks.batch.framework.controller.pool.CollectionJobPool</td></tr> * <tr><td>pool_job_controller_config.job_pool_props</td><td>job-pool-size=5000</td></tr> * </table> * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public class PoolJobController extends JobController { /** * Map holds all the pool job processors and loader being used for the given job * as values and name of the threads as keys. */ private Map jobProcessorsMap=new Hashtable(); /** * Map holds all the pool job processors and loader returned error codes as values and * name of the threads as keys. */ private Map jobProcessorsResultMap=new Hashtable(); /** * Job pool reference being used by the loader and processor. */ private JobPool pool=null; /** * Holds the statistics of the basic job controller. */ private JobStatistics jobStatistics=null; private static Logger logger=Logger.getLogger(PoolJobController.class); /** * Constructor enables the instantiation of the pool job controller instance. */ public PoolJobController() { } /** * <p> * Executes the job by creating the pool, loader and processor(s) based * on the configuration and have them work accordingly to process the job. * </p> * * @return Returns the processing error code for the job. */ public ErrorCode process() { logger.info("Entering process in pool job controller = " + super.jobContext.getJobName()); PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)super.jobContext.getJobConfig().getJobControllerConfig(); validateControllerConfiguration(poolJobControllerConfig); int processorCount=poolJobControllerConfig.getPoolJobProcessorThreadCount(); processorCount=(processorCount<1?1:processorCount); /** * Create and initialize the pool */ this.pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); pool.initialize(super.jobContext); CountDown countDownLock=new CountDown(processorCount+1); /** * Create, initialize and spwan the loader */ PoolJobLoader jobLoader=(PoolJobLoader)this.getInstance(poolJobControllerConfig.getPoolJobLoaderClassName()); String jobLoaderName=super.jobContext.getJobName()+"_Loader"; FutureResult jobLoaderFutureResult=new FutureResult(); Thread jobLoaderThread=new Thread(jobLoaderFutureResult.setter(this.getCallableLoader( countDownLock,jobLoader,super.jobContext,pool)),jobLoaderName); jobLoaderThread.start(); this.jobProcessorsResultMap.put(jobLoaderName, jobLoaderFutureResult); this.jobProcessorsMap.put(jobLoaderName, jobLoader); this.jobStatistics=new JobStatistics(super.jobContext.getJobName()); this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); /** * Create, initialize and spawn the processor(s). */ for(int i=0;i<processorCount;i++) { PoolJobProcessor jobProcessor=(PoolJobProcessor)this.getInstance(poolJobControllerConfig.getPoolJobProcessorClassName()); String jobProcessorName=super.jobContext.getJobName()+"_Processor_"+i; FutureResult jobProcessorFutureResult=new FutureResult(); Thread jobProcessorThread=new Thread(jobProcessorFutureResult.setter(this.getCallableProcessor( countDownLock,jobProcessor,super.jobContext,pool)),jobProcessorName); jobProcessorThread.start(); this.jobProcessorsResultMap.put(jobProcessorName, jobProcessorFutureResult); this.jobProcessorsMap.put(jobProcessorName, jobProcessor); } /** * Go to hydernate until all threads have finsihed their task. */ ErrorCode returnCode=hybernate(countDownLock); logger.info("Exiting process in pool job controller = " + super.jobContext.getJobName() + " with return code = " + returnCode); return returnCode; } /** * Returns the total number of records this job going to process. * This information will be obtained from the pool loader. * * @return Returns the number of records this job goint to process. */ public long getExpectedRecordsCount() { logger.trace("Entering getExpectedRecordsCount"); long expectedRecordsCount=0; String jobLoaderName=super.jobContext.getJobName()+"_Loader"; PoolJobLoader jobLoader=(PoolJobLoader)this.jobProcessorsMap.get(jobLoaderName); expectedRecordsCount=jobLoader.getTotalJobDataCount(); logger.trace("Exiting getExpectedRecordsCount = " + expectedRecordsCount); return expectedRecordsCount; } /** * Number of records got processed so far. * This information will be obtained from the pool. * * @return Returns the number of records processed so far. */ public long getProcessedRecordsCount() { logger.trace("Entering getProcessedRecordsCount"); long processedRecordsCount=0; for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) { Object processor=iterator.next(); if(processor instanceof PoolJobProcessor) processedRecordsCount=processedRecordsCount+((PoolJobProcessor)processor).getProcessedJobDataCount(); } logger.trace("Exiting getProcessedRecordsCount = " + processedRecordsCount); return processedRecordsCount; } /** * Returns the IDs assigned to all the processors, loader as a string array. * * @return Returns the string array consist of all the processor(s) and loader IDs. */ public java.lang.String[] getProcessorIDList() { logger.trace("Entering getProcessorIDList"); String processorIDList[]=new String[this.jobProcessorsMap.size()]; int i=0; for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) processorIDList[i]=(String)iterator.next(); logger.trace("Exiting getProcessorIDList"); return processorIDList; } /** * Returns the current state of the processor identified by the given processor * ID as the ProcessorState object. * * @param processorID processor ID identifies the processor or loader. * * @return Retuns the current state of the required processor. */ public ProcessorState getProcessorState(String processorID) { logger.trace("Exiting getProcessorState = " + processorID); ProcessorState state=null; Object processor=this.jobProcessorsMap.get(processorID); if(processor!=null) { if(processor instanceof PoolJobLoader) state=new ProcessorState(processorID, "Pool Job Loader", ((PoolJobLoader)processor).getLoaderState()); else if(processor instanceof PoolJobProcessor) state=new ProcessorState(processorID, "Pool Job Processor", ((PoolJobProcessor)processor).getProcessorState()); else { logger.error("What else it could be? " + processor.getClass().getName()); state=null; } } else state=null; logger.trace("Exiting getProcessorState = " + processorID + " state = " + state); return state; } /** * Returns the status of the processor identified by the given processor ID. * * @return Returns the status of the required processor. */ public ProcessorStatus getProcessorStatus(String processorID) { logger.trace("Exiting getProcessorStatus = " + processorID); ProcessorStatus status=null; Object processor=this.jobProcessorsMap.get(processorID); if(processor!=null) { if(processor instanceof PoolJobLoader) status=((PoolJobLoader)processor).getLoaderStatus(); else if(processor instanceof PoolJobProcessor) status=((PoolJobProcessor)processor).getProcessorStatus(); else { logger.error("What else it could be? " + processor.getClass().getName()); status=null; } } else status=null; logger.trace("Exiting getProcessorStatus = " + processorID + " status = " + status); return status; } /** * Stops the processor identified by the given processor ID. * * @return Returns true, if processor could be stopped, false otherwise. */ public boolean stop(String processorID) { logger.trace("Entering stop = " + processorID); boolean stopped=true; Object processor=this.jobProcessorsMap.get(processorID); if(processor!=null) { if(processor instanceof PoolJobLoader) stopped=((PoolJobLoader)processor).stop(); else if(processor instanceof PoolJobProcessor) stopped=((PoolJobProcessor)processor).stop(); else { logger.error("What else it could be? " + processor.getClass().getName()); stopped=false; } } else stopped=false; logger.trace("Exiting stop = " + processorID + " status = " + stopped); return stopped; } /** * Suspends the processor identified by the given processor ID. * * @return Returns true if it could suspend the processor, false otherwise. */ public boolean suspend(String processorID) { logger.trace("Entering suspend = " + processorID); boolean suspended=true; Object processor=this.jobProcessorsMap.get(processorID); if(processor!=null) { if(processor instanceof PoolJobLoader) suspended=((PoolJobLoader)processor).suspend(); else if(processor instanceof PoolJobProcessor) suspended=((PoolJobProcessor)processor).suspend(); else { logger.error("What else it could be? " + processor.getClass().getName()); suspended=false; } } else suspended=false; logger.trace("Exiting suspend = " + processorID + " status = " + suspended); return suspended; } /** * Resumes the processor identified by given processor ID. * * @return Returns true if processor is resumed, false otherwise. */ public boolean resume(String processorID) { logger.trace("Entering resume = " + processorID); boolean resumed=true; Object processor=this.jobProcessorsMap.get(processorID); if(processor!=null) { if(processor instanceof PoolJobLoader) resumed=((PoolJobLoader)processor).resume(); else if(processor instanceof PoolJobProcessor) resumed=((PoolJobProcessor)processor).resume(); else { logger.error("What else it could be? " + processor.getClass().getName()); resumed=false; } } else resumed=false; logger.trace("Exiting resume = " + processorID + " status = " + resumed); return resumed; } /** * <p> * Main thread will call this method and wait until all the processor(s) and loader have * finished their processing. Once they are finished, all the return codes * will be analyzed and one error code will be returned for this job. * </p * @param countDownLock CountDown lock being used by loader and all the processor(s). * * @return Returns the error code of the job. */ private ErrorCode hybernate(CountDown countDownLock) { logger.trace("Entering hybernate"); ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { logger.debug("Going to wait until loader and all the processor(s) is gonna finish."); countDownLock.acquire(); logger.info("Loader and all processor(s) have finished their task."); /** As per the contract cleanup the pool. */ this.pool.cleanup(); } catch(InterruptedException exception) { exception.printStackTrace(); logger.error("Exception while waiting for loader and all the processor(s) = " + exception.getMessage(), exception); return ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } this.jobStatistics.setEndTime(Calendar.getInstance().getTime()); this.jobStatistics.setMaxMemeoryUsage(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()); this.jobStatistics.setRecordsProcessed(this.getProcessedRecordsCount()); for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) { ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); if(!ErrorCode.JOB_COMPLETED_SUCCESSFULLY.equals(threadReturnCode)) { returnCode=threadReturnCode; break; } } this.jobStatistics.setExitCode(returnCode); logger.trace("Exiting hybernate = " + returnCode); return returnCode; } /** * Wraps the job loader with Callable interface and return the callable * interface. * * @param countDownLock Count down to be released at the end of the processing. * @param jobLoader Job Loader that needs to be invoked. * @param jobContext Context of the job being run. * @param pool Job pool where the loader needs to load the jobs. * * @return Returns the callabel interface wrapped around the job loader. */ private Callable getCallableLoader(final CountDown countDownLock,final PoolJobLoader jobLoader, final JobContext jobContext, final JobPool pool) { logger.trace("Entering getCallableLoader"); Callable callable=new Callable(){ public Object call() { ErrorCode returnCode=null; try { logger.trace("Going to call the loadPool method"); returnCode=jobLoader.loadPool(jobContext,pool); logger.debug("Done calling the loadPool method"); } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while loading the job data into the pool = " + exception.getMessage(), exception); returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; } countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); return returnCode; } }; logger.trace("Exiting getCallableLoader"); return callable; } /** * Wraps the job processor with Callable interface and return the callable * interface. * * @param countDownLock Count down to be released at the end of the processing. * @param jobProcessor Job processor that needs to be invoked. * @param jobContext Context of the job being run. * @param pool Job pool where the processor needs to pull the jobs. * * @return Returns the callabel interface wrapped around the job processor. */ private Callable getCallableProcessor(final CountDown countDownLock,final PoolJobProcessor jobProcessor, final JobContext jobContext, final JobPool pool) { logger.trace("Entering getCallableProcessor"); Callable callable=new Callable(){ public Object call() { ErrorCode returnCode=null; try { logger.trace("Going to call the processPool method"); returnCode=jobProcessor.processPool(jobContext,pool); logger.debug("Done calling the processPool method"); } catch(Throwable exception) { exception.printStackTrace(); logger.error("Exception while processing the job data from the pool = " + exception.getMessage(), exception); returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); return returnCode; } }; logger.trace("Exiting getCallableProcessor"); return callable; } /** * Instantiates and returns the instance of the required class. * * @param className Class name of the required instance. * * @return Retrurns the instance of the required class. * * @throws ConfigurationException If class could not be instantiated. */ private Object getInstance(String className) { try { return Class.forName(className).newInstance(); } catch(Exception exception) { exception.printStackTrace(); logger.fatal(exception.getMessage(),exception); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); } } /** * Validates the pool job controller configuration by instantiating and verifying * all the required loader, processor and pool classes. * * @param controllerConfig Controller configuration defined in job configuration. * * @throws ConfigurationException If any one required instances cannot be instantiated. */ private void validateControllerConfiguration(PoolJobControllerConfig controllerConfig) { Object jobPool=this.getInstance(controllerConfig.getPoolClassName()); if(jobPool instanceof JobPool) { logger.debug("Job pool is configured properly = " + controllerConfig.getPoolClassName()); } else { logger.fatal("Configured job pool class name " + controllerConfig.getPoolClassName() + " cannot be associated to JobPool"); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, "Configured job pool class name " + controllerConfig.getPoolClassName() + " cannot be associated to JobPool"); } Object jobLoader=this.getInstance(controllerConfig.getPoolJobLoaderClassName()); if(jobLoader instanceof PoolJobLoader) { logger.debug("Job loader is configured properly = " + controllerConfig.getPoolJobLoaderClassName()); } else { logger.fatal("Configured job loader class name " + controllerConfig.getPoolJobLoaderClassName() + " cannot be associated to PoolJobLoader"); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, "Configured job loader class name " + controllerConfig.getPoolJobLoaderClassName() + " cannot be associated to PoolJobLoader"); } Object jobProcessor=this.getInstance(controllerConfig.getPoolJobProcessorClassName()); if(jobProcessor instanceof PoolJobLoader) { logger.debug("Job Processor is configured properly = " + controllerConfig.getPoolJobProcessorClassName()); } else { logger.fatal("Configured job processor class name " + controllerConfig.getPoolJobProcessorClassName() + " cannot be associated to PoolJobProcessor"); throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, "Configured job processor class name " + controllerConfig.getPoolJobProcessorClassName() + " cannot be associated to PoolJobProcessor"); } } /** * @see org.jmonks.batch.framework.controller.JobController#getJobStatistics() */ public JobStatistics getJobStatistics() { if(this.jobStatistics.getEndTime()!=null) return this.jobStatistics; else return null; } } --- NEW FILE: CollectionJobPool.java --- package org.jmonks.batch.framework.controller.pool; import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; import java.util.Hashtable; import java.util.Map; import org.apache.log4j.Logger; import org.jmonks.batch.framework.JobContext; import org.jmonks.batch.framework.config.PoolJobControllerConfig; /** * <p> * Provides the implementation of job pool using java utility collections. * This job pool implementation can be configured for any given job using the * following configuration in job configuration.<br> * <pre> * <job-pool job-pool-class-name="org.jmonks.batch.framework.controller.pool.CollectionJobPool"> * <property key="job-pool-size">50000</property> * </job-pool> * </pre> * </p> * * @author Suresh Pragada * @version 1.0 * @since 1.0 */ public class CollectionJobPool implements JobPool { /** * Default collection pool size. This will be used when pool size is not * configured or problem in obtaining the pool size. */ public static final int DEFAULT_COLLECTION_POOL_SIZE = 1000; /** * Property name by which pool size will be associated with. */ protected static final String POOL_SIZE_PROPERTY_NAME = "job-pool-size"; /** * Map holds the configuration defined for the job pool. There might be properies * needed other than initialization. */ protected Map configProps=null; /** * Java utility collection holds the job data objects. */ protected BoundedBuffer pool=null; /** * Holds the number of job data objects have been loaded by job loader. */ protected long loadedJobsCount=0; /** * Object to be pushed to the bounded buffer to denote the end of the pool. */ private static final Object END_OF_POOL = new Object(); private static Logger logger=Logger.getLogger(CollectionJobPool.class); /** * Default constructor for the instantiation purposes. */ public CollectionJobPool() { } /** * @see org.jmonks.batch.framework.controller.pool.JobPool#loadJobData(Object) */ public boolean loadJobData(Object jobData) { boolean loaded=false; while(!loaded) { try { if(jobData!=null) { this.pool.put(jobData); /** END_OF_POOL will be loaded by getNextJobData API to wake up other processors */ if(jobData!=END_OF_POOL) this.loadedJobsCount++; } else { /** * null from loader indicates that he has done loading with the job data. * Push END_OF_POOL to the pool to let processors know that loader is done loading the jobs. */ this.pool.put(END_OF_POOL); } loaded=true; } catch(InterruptedException exception) { exception.printStackTrace(); logger.info("Got exception while loading the job data = " + jobData, exception); } } return loaded; } /** * @see org.jmonks.batch.framework.controller.pool.JobPool#getNextJobData */ public Object getNextJobData() { Object jobData=null; try { jobData=this.pool.take(); /** * If retrieved job data is END_OF_POOL object return null to caller as per * method contract and put EN_OF_POOL back to the pool to wake up other processors. */ if(jobData==END_OF_POOL) { this.pool.put(END_OF_POOL); jobData=null; } } catch(InterruptedException exception) { exception.printStackTrace(); logger.info("Got exception while getting the job data from pool", exception); } return jobData; } /** * Initializes the collection job pool using the configuration defined * in the job configuration. It gets the pool size from the defined configuration, * by looking for the property "job-pool-size", if it couldnt find it uses the default pool size "1000" * and initializes the collection to be used as the pool. * * @param jobContext Context of the job being run. */ public void initialize(JobContext jobContext) { PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)jobContext.getJobConfig().getJobControllerConfig(); this.configProps=new Hashtable(poolJobControllerConfig.getPoolConfigProperties()); /** * Try to get the pool size from configuration... If unable to get use default size. */ int poolSize=CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE; String poolSizePropertyValue=(String)this.configProps.get(CollectionJobPool.POOL_SIZE_PROPERTY_NAME); if(poolSizePropertyValue!=null) { logger.info("Received the pool size " + poolSizePropertyValue + " from configuration"); try { poolSize=Integer.parseInt(poolSizePropertyValue); if(poolSize<1) { poolSize=CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE; } } catch(Exception exception) { logger.info("Exception while obtaining the pool size from configuration... Using the default pool size"); } } else { logger.info("pool size has not been configured.. using the default pool size"); } this.pool=new BoundedBuffer(poolSize); } /** * Removes all the entries from collection. */ public void cleanup() { /** * Nothing to do the cleanup. */ logger.debug("Cleanup has been done"); } /** * @see org.jmonks.batch.framework.controller.pool.JobPool#getLoadedJobDataCount */ public long getLoadedJobDataCount() { return this.loadedJobsCount; } } |