[Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/pool AbstractPoolJobLo
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-05-14 01:31:36
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv2130 Modified Files: AbstractPoolJobLoader.java CollectionJobPool.java JobPool.java PoolJobController.java PoolJobLoader.java PoolJobProcessor.java Log Message: no message Index: PoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobLoader.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** PoolJobLoader.java 4 Mar 2006 04:42:01 -0000 1.2 --- PoolJobLoader.java 14 May 2006 01:31:32 -0000 1.3 *************** *** 1,27 **** package org.jmonks.batchserver.framework.controller.pool; import java.util.Map; /** ! * Loader is the one which loads the job data into the pool. This inteface defines the responsibilites of the loader and some of these responsibilities will be implemented by AbstractPoolJobLoader. * ! * @author : Suresh Pragada ! * @version 1.0 */ public interface PoolJobLoader { public void cleanup(); ! ! public int loadPool(); ! public boolean suspend(); ! public boolean resume(); ! public boolean stop(); ! public int getTotalRecords(); ! public Object getLoaderState(); ! ! public void initialize(Map configProps, org.jmonks.batchserver.framework.controller.pool.JobPool pool); } --- 1,69 ---- package org.jmonks.batchserver.framework.controller.pool; import java.util.Map; + import org.jmonks.batchserver.framework.management.ProcessorStatus; /** ! * PoolJobLoader loads the job data into the pool to be processed by ! * PoolJobProcessor(s). * ! * @author Suresh Pragada ! * @version 1.0 ! * @since 1.0 */ public interface PoolJobLoader { + /** + * Chance for the loader to initialize itself with the properties defined + * in the job configuration and gets the reference to the job pool. + * + * @param configProps Configuration defined for the loader. + * @param pool Job Pool reference. + */ + public void initialize(Map configProps, JobPool pool); + /** + * Chance to do some cleanup before the job being shutdown. + */ public void cleanup(); ! /** ! * Loads the job data to the job pool. When finished loading of all the job ! * data, loads the <i>NULL</i> into the pool. ! */ ! public void loadJobData(); ! /** ! * Suspends the loader. ! * ! * @return Returns true if loader is suspended, false otherwise. ! */ public boolean suspend(); ! /** ! * Resumes the loader. ! * ! * @return Returns true if loader is resumed, false otherwise. ! */ public boolean resume(); ! /** ! * Stops the loader. ! * ! * @return Returns true if loader is stopped, false otherwise. ! */ public boolean stop(); ! /** ! * Gets the total records this loader is going to load. ! * ! * @return Returns the number of records this loader is going to load. ! */ public int getTotalRecords(); ! /** ! * Gets the loader state as object which can be understan by the monitoring ! * client. Usually, this would be used for display purposes. ! * ! * @return Returns the displayable object representing the loader state. ! */ public Object getLoaderState(); ! /** ! * Gets the loader status. ! * ! * @return Returns the loader status. ! */ ! public ProcessorStatus getLoaderStatus(); } Index: JobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/JobPool.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** JobPool.java 4 Mar 2006 04:42:01 -0000 1.2 --- JobPool.java 14 May 2006 01:31:32 -0000 1.3 *************** *** 3,25 **** /** ! * <p> ! * Pool provides the interface to load the job information into it and retrieves the job information from the pool. ! * </p> * ! * @author : Suresh Pragada ! * @version 1.0 */ ! public interface JobPool { ! public boolean loadJobData(Object jobData); ! ! public Object getNextJobData(); ! ! public void initialize(Map configProps); ! public void cleanup(); ! public int getLoadedCount(); ! ! public void getProcessedCount(); } --- 3,59 ---- /** ! * <p> ! * JobPool pools all the job data being loaded by the loader and serves this data ! * when job processors requests for the processing. This interface enables the loader, ! * processor and controller to interact with the pool implementation. ! * </p> * ! * @author Suresh Pragada ! * @version 1.0 ! * @since 1.0 */ ! public interface JobPool { ! /** ! * Controller calls this method with the properties defined in the configuration ! * before reference is being passed to the job loader and job processor. ! * ! * @param configProps Properties defined in the configuration for the job pool. ! */ ! public void initialize(Map configProps); ! /** ! * Gets the next available job data(piece of information) to be processed. ! * If it is not available, it will be waited until it gets the next job data. ! * If loader is done with loading the jobs, it returns null. At this time, ! * processor should quit its processing. ! * ! * @return Returns the next available job data to be process, ! * null if no job data is available. ! */ ! public Object getNextJobData(); ! /** ! * Job Loader will use this API to load the job data into the pool. ! * If job loader is done with the loading all the data to be processed, ! * it should load null indicating that loading of all the job data has been ! * done. ! * ! * @param jobData Job data needs to be processed. ! */ ! public boolean loadJobData(Object jobData); ! /** ! * Controller calls this method after job processing has been done. ! */ public void cleanup(); ! /** ! * Returns the number of job data objects being loaded into the pool. ! * ! * @return Returns the number of job data objects being loaded into the pool. ! */ public int getLoadedCount(); ! /** ! * Returns the number of job data objects being processed from the pool. ! * ! * @return Returns the number of job data objects have been processed so far. ! */ ! public int getProcessedCount(); } Index: PoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobProcessor.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** PoolJobProcessor.java 4 Mar 2006 04:42:01 -0000 1.2 --- PoolJobProcessor.java 14 May 2006 01:31:32 -0000 1.3 *************** *** 2,26 **** import java.util.Map; /** ! * Processor is the one which processes the job data. This interface defines the responsiblities of the job processor. * ! * @author : Suresh pragada ! * @version 1.0 */ public interface PoolJobProcessor { public boolean suspend(); ! public boolean resume(); ! public boolean stop(); ! public void cleanup(); ! ! public int processPool(); ! public Object getProcessorState(); ! ! public void initialize(Map configProps, org.jmonks.batchserver.framework.controller.pool.JobPool pool); } --- 2,65 ---- import java.util.Map; + import org.jmonks.batchserver.framework.management.ProcessorStatus; /** ! * PoolJobProcessor gets the job data to be processed from the job pool and ! * processes it. * ! * @author Suresh pragada ! * @version 1.0 ! * @since 1.0 */ public interface PoolJobProcessor { + /** + * Chance to initialize itself using the information configured for this job + * processor in job configuration. + * + * @param configProps Configuration defined for the job processor in job configuration. + * @param pool Reference to Job Pool. + */ + public void initialize(Map configProps, JobPool pool); + /** + * Suspends the pool job processor. + * + * @return Returns true if processor is suspended, false otherwise. + */ public boolean suspend(); ! /** ! * Resumes the pool job processor. ! * ! * @return Returns true if processor is resumed, false otherwise. ! */ public boolean resume(); ! /** ! * Stops the processor. ! * ! * @return Returns true if processor is stopped, false otherwise. ! */ public boolean stop(); ! /** ! * Chance to do any cleanup at the end of the processing. ! */ public void cleanup(); ! /** ! * Process the job data getting from the job pool and quits when job pool ! * out of the job data(returns null). ! * ! * @return Returns the number of job data instances being procssed. ! */ ! public int process(); ! /** ! * Gets the processor to be displyed or anaylyzed for the monitoring purposes. ! * ! * @return Returns an object understanble/displayable by monitoring client. ! */ public Object getProcessorState(); ! /** ! * Gets the processor status being used by the management clients. ! * ! * @return Returns the status of the job. ! */ ! public ProcessorStatus getProcessorStatus(); } Index: AbstractPoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobLoader.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** AbstractPoolJobLoader.java 4 Mar 2006 04:42:01 -0000 1.2 --- AbstractPoolJobLoader.java 14 May 2006 01:31:32 -0000 1.3 *************** *** 19,23 **** } ! public abstract int loadJobData(); public abstract void initialize(Map configProps); --- 19,23 ---- } ! public abstract void load(); public abstract void initialize(Map configProps); Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** PoolJobController.java 3 May 2006 22:11:18 -0000 1.6 --- PoolJobController.java 14 May 2006 01:31:32 -0000 1.7 *************** *** 1,169 **** package org.jmonks.batchserver.framework.controller.pool; import org.jmonks.batchserver.framework.common.ErrorCode; - import org.jmonks.batchserver.framework.config.JobControllerConfig; import org.jmonks.batchserver.framework.controller.JobController; ! ! ! /** * <p> ! * PoolJobController provides the job architecture based on the pool concept, where a loader is load all the information to be processed into the pool and processors(>=1) retrieves this information from pool and process them. As this subclasses the abstract JobController, it implements all the management and monitoring methods. It provides the abstract loader and processor classes to be overriden by job implementation and comes with different implementation of pools. * <br><br> ! * To write any job using this controller, developer needs to write a loader class extends AbstractPoolJobLoader, which loads the data needs to be processed into the pool and write a processor class extending the AbstractPoolJobProcessor, which process the data from the pool. ! * Once implementation is done, this job should be configured either using the XML configuration or DB configuration. * * <br><br> ! * XML Configuration is as follows <br><br> * <pre> ! * <job-config job-name="process_file_abc"> ! * <job-controller controller-class-name="org.jmonks.batchserver.framework.controller.pool.PoolJobController" ! * controller-config-class-name="org.jmonks.batchserver.framework.config.PoolControllerConfig"> ! * <job-loader pool-job-loader-class-name="com.mycompany.batch.processfileabc.AbcJobLoader"> ! * <property key="loader-info1">loader-value1</property> ! * </job-loader> ! * <job-processor pool-job-processor-class-name="com.mycompany.batch.processfileabc.AbcJobLoader"> ! * <thread-count>1</thread-count> ! * <property key="processor-info1">processor-value1</property> ! * </job-processor> ! * <job-pool job-pool-class-name="org.jmonks.batchserver.framework.controller.pool.DefaultJobPool"> ! * <property key="pool-size">50000</property> ! * </job-pool> ! * <property key="controller-info1">controller-value1</property> ! * </job-controller> ! * <job-logging-config> ! * <logging-property-file>com.mycompany.batch.processfileabc.Logging</logging-property-file> ! * </job-logging-config> ! * </job-config> * </pre> * </p> ! * @author : Suresh Pragada ! * @version 1.0 */ public class PoolJobController extends JobController { ! ! private PoolJobProcessor mJobProcessor; ! ! private JobPool mJobPool; ! ! private PoolJobLoader mPoolJobLoader; ! public PoolJobController(JobControllerConfig config) { - } public ErrorCode process() - { return null; - } /** ! * Returns the total number of records this job gonna process. This information will be obtained from the pool loader. * ! * @return Returns the number of records this job gonna process. */ public int getExpectedRecordsCount() - - - { return 0; - - - } /** ! * Number of records this job processed so far. This information will be obtained from the pool. * ! * @return Returns the number of records processed so far. */ public int getProcessedRecordsCount() - - { return 0; - - } /** ! * Returns the number of threads creates to process this job. This includes each thread for loader and pool and the specified number of threads for the processor. ! * ! * @return Returns the number of threads controller created to process this job. */ ! public java.lang.String[] getThreadIDList() ! ! { return null; - - } /** ! * Returns the current state of the thread. * ! * @param threadNo Thread number. ! * @return Retuns the current state of the thread. */ ! public org.jmonks.batchserver.framework.management.ThreadState getThreadState(String threadID) ! ! { return null; - - } /** ! * Returns the status of the job. * ! * @return Returns the status of the job. */ ! public org.jmonks.batchserver.framework.management.JobStatus getJobStatus() ! ! { return null; - - } /** ! * This calls the stop method on the loader processor and persists the loader, pool and processor, if restart flag is true and return with appropriate return code. */ ! public boolean stop() ! { return true; - } /** ! * This calls the suspend on the loader and processor threads. ! * ! * @return Returns true, if it could suspend the processes, false, otherwise. */ ! public boolean suspend() ! { return true; - } /** ! * This would resume the job. ! * ! * @return Returns true, if it could resume the job, false otherwise. */ ! public boolean resume() ! { return true; - } } --- 1,161 ---- package org.jmonks.batchserver.framework.controller.pool; import org.jmonks.batchserver.framework.common.ErrorCode; import org.jmonks.batchserver.framework.controller.JobController; ! import org.jmonks.batchserver.framework.management.ProcessorState; ! import org.jmonks.batchserver.framework.management.ProcessorStatus; /** * <p> ! * PoolJobController provides the job architecture based on the pool concept, ! * where a loader is load all the information to be processed into the pool and ! * processors(>=1) retrieves the information from pool and process them. ! * It provides the abstract loader and processor classes to be overriden by job ! * implementation and comes with different implementation of pools. * <br><br> ! * To write any job using this controller, developer needs to write a loader class ! * extends AbstractPoolJobLoader, which loads the data needs to be processed into ! * the pool and write a processor class extending the AbstractPoolJobProcessor, ! * which process the data from the pool. Once implementation is done, ! * this job should be configured either using the XML configuration or DB configuration. * * <br><br> ! * <i>XML Configuration is as follows</i> <br><br> * <pre> ! * <job-config job-name="process_file_abc"> ! * <job-controller controller-class-name="org.jmonks.batchserver.framework.controller.pool.PoolJobController" ! * controller-config-class-name="org.jmonks.batchserver.framework.config.PoolControllerConfig"> ! * <job-loader pool-job-loader-class-name="com.mycompany.batch.processfileabc.AbcJobLoader"> ! * <property key="loader-info1">loader-value1</property> ! * </job-loader> ! * <job-processor pool-job-processor-class-name="com.mycompany.batch.processfileabc.AbcJobLoader"> ! * <thread-count>1</thread-count> ! * <property key="processor-info1">processor-value1</property> ! * </job-processor> ! * <job-pool job-pool-class-name="org.jmonks.batchserver.framework.controller.pool.DefaultJobPool"> ! * <property key="pool-size">50000</property> ! * </job-pool> ! * <property key="controller-info1">controller-value1</property> ! * </job-controller> ! * <job-logging-config> ! * <logging-property-file>com.mycompany.batch.processfileabc.Logging</logging-property-file> ! * </job-logging-config> ! * </job-config> * </pre> + * <br><br> + * <i>DB Configuration is as follows</i> + * <table> + * <tr><td>Column</td><td>Value</td></tr> + * </table> * </p> ! * ! * @author Suresh Pragada ! * @version 1.0 ! * @since 1.0 */ public class PoolJobController extends JobController { ! /** ! * Constructor enables the instantiation of the pool job controller instance. ! */ ! public PoolJobController() { } + /** + * <p> + * Executes the job by creating the pool, loader and processor(s) based + * on the configuration and have them work accordingly to process the job. + * </p> + * + * @return Returns the processing error code for the job. + */ public ErrorCode process() { return null; } /** ! * Returns the total number of records this job going to process. ! * This information will be obtained from the pool loader. * ! * @return Returns the number of records this job goint to process. */ public int getExpectedRecordsCount() { return 0; } /** ! * Number of records got processed so far. ! * This information will be obtained from the pool. * ! * @return Returns the number of records processed so far. */ public int getProcessedRecordsCount() { return 0; } /** ! * Returns the IDs assigned to all the processors, loader as a string array. ! * ! * @return Returns the string array consist of all the processor(s) and loader IDs. */ ! public java.lang.String[] getProcessorIDList() { return null; } /** ! * Returns the current state of the processor identified by the given processor ! * ID as the ProcessorState object. * ! * @param processorID processor ID identifies the processor or loader. ! * ! * @return Retuns the current state of the required processor. */ ! public ProcessorState getProcessorState(String processorID) { return null; } /** ! * Returns the status of the processor identified by the given processor ID. * ! * @return Returns the status of the required processor. */ ! public ProcessorStatus getProcessorStatus(String processorID) { return null; } /** ! * Stops the processor identified by the given processor ID. ! * ! * @return Returns true, if processor could be stopped, false otherwise. */ ! public boolean stop(String processorID) { return true; } /** ! * Suspends the processor identified by the given processor ID. ! * ! * @return Returns true if it could suspend the processor, false otherwise. */ ! public boolean suspend(String processorID) { return true; } /** ! * Resumes the processor identified by given processor ID. ! * ! * @return Returns true if processor is resumed, false otherwise. */ ! public boolean resume(String processorID) { return true; } } Index: CollectionJobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/CollectionJobPool.java,v retrieving revision 1.2 retrieving revision 1.3 diff -C2 -d -r1.2 -r1.3 *** CollectionJobPool.java 4 Mar 2006 04:42:01 -0000 1.2 --- CollectionJobPool.java 14 May 2006 01:31:32 -0000 1.3 *************** *** 1,49 **** package org.jmonks.batchserver.framework.controller.pool; ! ! import java.util.List; import java.util.Map; /** ! * <p> ! * This provides the pool implementaiton using java utility collections. This is the default implementatio of pool, unless developer configured an another one. ! * </p> * ! * @author : Suresh Pragada ! * @version 1.0 */ ! public abstract class CollectionJobPool implements JobPool { public CollectionJobPool() { } - public abstract List getCollection(); - public boolean loadJobData(Object jobData) { ! return true; } public Object getNextJobData() { ! return null; } public void initialize(Map configProps) { } public void cleanup() { } public int getLoadedCount() - { ! return 0; } ! public void getProcessedCount() { } } --- 1,150 ---- package org.jmonks.batchserver.framework.controller.pool; ! import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; ! import java.util.Hashtable; import java.util.Map; + import org.apache.log4j.Logger; /** ! * <p> ! * Provides the implementation of job pool using java utility collections. ! * This job pool implementation can be configured for any given job using the ! * following configuration in job configuration.<br> ! * <pre> ! * <job-pool job-pool-class-name="org.jmonks.batchserver.framework.controller.pool.CollectionJobPool"> ! * <property key="job-pool-size">50000</property> ! * </job-pool> ! * </pre> ! * </p> * ! * @author Suresh Pragada ! * @version 1.0 ! * @since 1.0 */ ! public class CollectionJobPool implements JobPool { + /** + * Default collection pool size. This will be used when pool size is not + * configured or problem in obtaining the pool size. + */ + public static final int DEFAULT_COLLECTION_POOL_SIZE = 1000; + /** + * Property name by which pool size will be associated with. + */ + protected static final String POOL_SIZE_PROPERTY_NAME = "job-pool-size"; + /** + * Map holds the configuration defined for the job pool. There might be properies + * needed other than initialization. + */ + protected Map configProps=null; + /** + * Java utility collection holds the job data objects. + */ + protected BoundedBuffer pool=null; + /** + * Holds the number of job data objects have been loaded by job loader. + */ + protected int loadedJobsCount=0; + /** + * Holds the number of job data objects have been served to job processors. + */ + protected int processedJobsCount=0; + + private static Logger logger=Logger.getLogger(CollectionJobPool.class); + + /** + * Default constructor for the instantiation purposes. + */ public CollectionJobPool() { } public boolean loadJobData(Object jobData) { ! boolean loaded=false; ! while(!loaded) ! { ! try ! { ! this.pool.put(jobData); ! loaded=true; ! this.loadedJobsCount++; ! } ! catch(InterruptedException exception) ! { ! logger.info("Got exception while loading the job data = " + jobData, exception); ! } ! } ! return loaded; } public Object getNextJobData() { ! Object jobData=null; ! try ! { ! jobData=this.pool.take(); ! this.processedJobsCount++; ! } ! catch(InterruptedException exception) ! { ! logger.info("Got exception while getting the job data from pool", exception); ! } ! return jobData; } + /** + * Initializes the collection job pool using the configuration defined + * in the job configuration. It tries to get the pool size from the given configuration, + * by looking for the property "job-pool-size", if it couldnt find it uses the default pool size "1000" + * and initializes the collection to be used as the pool. + * + * @param configProps Configuration defined for the job pool. + */ public void initialize(Map configProps) { + this.configProps=new Hashtable(configProps); + /** + * Try to get the pool size from configuration... If unable to get use default size. + */ + int poolSize=CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE; + String poolSizePropertyValue=(String)this.configProps.get(CollectionJobPool.POOL_SIZE_PROPERTY_NAME); + if(poolSizePropertyValue!=null) + { + logger.info("Received the pool size " + poolSizePropertyValue + " from configuration"); + try + { + poolSize=Integer.parseInt(poolSizePropertyValue); + } + catch(Exception exception) + { + logger.info("Exception while obtaining the pool size from configuration... Using the default pool size"); + } + } + else + { + logger.info("pool size has not been configured.. using the default pool size"); + } + this.pool=new BoundedBuffer(poolSize); } + /** + * Removes all the entries from collection. + */ public void cleanup() { + /** + * Nothing to do the cleanup. + */ + logger.debug("Cleanup has been done"); } public int getLoadedCount() { ! return this.loadedJobsCount; } ! public int getProcessedCount() { + return this.processedJobsCount; } } |