[Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/pool AbstractPoolJob
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-09-13 23:29:49
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv25427/framework/controller/pool Modified Files: AbstractPoolJobLoader.java AbstractPoolJobProcessor.java CollectionJobPool.java JobPool.java PoolJobController.java PoolJobLoader.java PoolJobProcessor.java Log Message: no message Index: PoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobLoader.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** PoolJobLoader.java 13 Sep 2006 04:49:46 -0000 1.8 --- PoolJobLoader.java 13 Sep 2006 23:29:45 -0000 1.9 *************** *** 1,5 **** package org.jmonks.batchserver.framework.controller.pool; - import java.util.Map; import org.jmonks.batchserver.framework.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; --- 1,5 ---- package org.jmonks.batchserver.framework.controller.pool; import org.jmonks.batchserver.framework.ErrorCode; + import org.jmonks.batchserver.framework.JobContext; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 45,49 **** * @return Retrurns the final status of the loader. */ ! public ErrorCode loadPool(Map configProps,JobPool pool); /** * Suspends the loader. --- 45,49 ---- * @return Retrurns the final status of the loader. */ ! public ErrorCode loadPool(JobContext jobContext,JobPool pool); /** * Suspends the loader. Index: JobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/JobPool.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** JobPool.java 13 Sep 2006 04:49:46 -0000 1.6 --- JobPool.java 13 Sep 2006 23:29:45 -0000 1.7 *************** *** 1,4 **** package org.jmonks.batchserver.framework.controller.pool; ! import java.util.Map; /** --- 1,4 ---- package org.jmonks.batchserver.framework.controller.pool; ! import org.jmonks.batchserver.framework.JobContext; /** *************** *** 21,25 **** * @param configProps Properties defined in the configuration for the job pool. */ ! public void initialize(Map configProps); /** * Gets the next available job data(piece of information) to be processed. --- 21,25 ---- * @param configProps Properties defined in the configuration for the job pool. */ ! public void initialize(JobContext jobContext); /** * Gets the next available job data(piece of information) to be processed. Index: PoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobProcessor.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** PoolJobProcessor.java 13 Sep 2006 04:49:46 -0000 1.7 --- PoolJobProcessor.java 13 Sep 2006 23:29:45 -0000 1.8 *************** *** 1,6 **** package org.jmonks.batchserver.framework.controller.pool; - - import java.util.Map; import org.jmonks.batchserver.framework.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; --- 1,5 ---- package org.jmonks.batchserver.framework.controller.pool; import org.jmonks.batchserver.framework.ErrorCode; + import org.jmonks.batchserver.framework.JobContext; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 29,33 **** * @return Returns the error code. */ ! public ErrorCode processPool(Map configProps, JobPool pool); /** --- 28,32 ---- * @return Returns the error code. */ ! public ErrorCode processPool(JobContext jobContext, JobPool pool); /** Index: AbstractPoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobProcessor.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** AbstractPoolJobProcessor.java 13 Sep 2006 04:49:46 -0000 1.6 --- AbstractPoolJobProcessor.java 13 Sep 2006 23:29:45 -0000 1.7 *************** *** 1,7 **** package org.jmonks.batchserver.framework.controller.pool; import EDU.oswego.cs.dl.util.concurrent.Mutex; - import java.util.Map; import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; --- 1,7 ---- package org.jmonks.batchserver.framework.controller.pool; import EDU.oswego.cs.dl.util.concurrent.Mutex; import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.ErrorCode; + import org.jmonks.batchserver.framework.JobContext; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 62,66 **** * @return Returns the status code of this processor. */ ! public ErrorCode processPool(Map configProps, JobPool pool) { logger.trace("Entering processPool"); --- 62,66 ---- * @return Returns the status code of this processor. */ ! public ErrorCode processPool(JobContext jobContext, JobPool pool) { logger.trace("Entering processPool"); *************** *** 72,76 **** { this.processorStatus=ProcessorStatus.INITIALIZING; ! initialize(configProps); this.processorStatus=ProcessorStatus.RUNNING; } --- 72,76 ---- { this.processorStatus=ProcessorStatus.INITIALIZING; ! initialize(jobContext); this.processorStatus=ProcessorStatus.RUNNING; } *************** *** 214,218 **** * @param configProps Configuration defined for the job processor in job configuration. */ ! public abstract void initialize(Map configProps); /** --- 214,218 ---- * @param configProps Configuration defined for the job processor in job configuration. */ ! public abstract void initialize(JobContext jobContext); /** Index: AbstractPoolJobLoader.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/AbstractPoolJobLoader.java,v retrieving revision 1.9 retrieving revision 1.10 diff -C2 -d -r1.9 -r1.10 *** AbstractPoolJobLoader.java 13 Sep 2006 04:49:46 -0000 1.9 --- AbstractPoolJobLoader.java 13 Sep 2006 23:29:45 -0000 1.10 *************** *** 12,18 **** import EDU.oswego.cs.dl.util.concurrent.Mutex; - import java.util.Map; import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.ErrorCode; import org.jmonks.batchserver.framework.management.ProcessorStatus; --- 12,18 ---- import EDU.oswego.cs.dl.util.concurrent.Mutex; import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.ErrorCode; + import org.jmonks.batchserver.framework.JobContext; import org.jmonks.batchserver.framework.management.ProcessorStatus; *************** *** 67,71 **** * @return Returns the final status of loading. */ ! public final ErrorCode loadPool(Map configProps,JobPool pool) { logger.trace("Entering loadPool"); --- 67,71 ---- * @return Returns the final status of loading. */ ! public final ErrorCode loadPool(JobContext jobContext,JobPool pool) { logger.trace("Entering loadPool"); *************** *** 75,79 **** { this.loaderStatus=ProcessorStatus.RUNNING; ! returnCode=this.loadPool(configProps); this.loaderStatus=ProcessorStatus.FINISHED; } --- 75,79 ---- { this.loaderStatus=ProcessorStatus.RUNNING; ! returnCode=this.loadPool(jobContext); this.loaderStatus=ProcessorStatus.FINISHED; } *************** *** 235,239 **** * @return Returns the final status of the loader. */ ! public abstract ErrorCode loadPool(Map configProps); /** * Returns the number of job data objects that this loader is going to load. --- 235,239 ---- * @return Returns the final status of the loader. */ ! public abstract ErrorCode loadPool(JobContext jobContext); /** * Returns the number of job data objects that this loader is going to load. Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/PoolJobController.java,v retrieving revision 1.14 retrieving revision 1.15 diff -C2 -d -r1.14 -r1.15 *** PoolJobController.java 13 Sep 2006 04:49:46 -0000 1.14 --- PoolJobController.java 13 Sep 2006 23:29:45 -0000 1.15 *************** *** 10,13 **** --- 10,14 ---- import org.jmonks.batchserver.framework.JobStatistics; import org.jmonks.batchserver.framework.ErrorCode; + import org.jmonks.batchserver.framework.JobContext; import org.jmonks.batchserver.framework.config.ConfigurationException; import org.jmonks.batchserver.framework.config.PoolJobControllerConfig; *************** *** 105,110 **** public ErrorCode process() { ! logger.info("Entering process in pool job controller = " + super.getJobName()); ! PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)super.getJobControllerConfig(); validateControllerConfiguration(poolJobControllerConfig); int processorCount=poolJobControllerConfig.getPoolJobProcessorThreadCount(); --- 106,111 ---- public ErrorCode process() { ! logger.info("Entering process in pool job controller = " + super.jobContext.getJobName()); ! PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)super.jobContext.getJobConfig().getJobControllerConfig(); validateControllerConfiguration(poolJobControllerConfig); int processorCount=poolJobControllerConfig.getPoolJobProcessorThreadCount(); *************** *** 114,118 **** */ this.pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); ! pool.initialize(poolJobControllerConfig.getPoolConfigProperties()); CountDown countDownLock=new CountDown(processorCount+1); --- 115,119 ---- */ this.pool=(JobPool)this.getInstance(poolJobControllerConfig.getPoolClassName()); ! pool.initialize(super.jobContext); CountDown countDownLock=new CountDown(processorCount+1); *************** *** 121,132 **** */ PoolJobLoader jobLoader=(PoolJobLoader)this.getInstance(poolJobControllerConfig.getPoolJobLoaderClassName()); ! String jobLoaderName=super.getJobName()+"_Loader"; FutureResult jobLoaderFutureResult=new FutureResult(); Thread jobLoaderThread=new Thread(jobLoaderFutureResult.setter(this.getCallableLoader( ! countDownLock,jobLoader,poolJobControllerConfig.getPoolJobLoaderConfigProperties(),pool)),jobLoaderName); jobLoaderThread.start(); this.jobProcessorsResultMap.put(jobLoaderName, jobLoaderFutureResult); this.jobProcessorsMap.put(jobLoaderName, jobLoader); ! this.jobStatistics=new JobStatistics(super.getJobName()); this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); /** --- 122,133 ---- */ PoolJobLoader jobLoader=(PoolJobLoader)this.getInstance(poolJobControllerConfig.getPoolJobLoaderClassName()); ! String jobLoaderName=super.jobContext.getJobName()+"_Loader"; FutureResult jobLoaderFutureResult=new FutureResult(); Thread jobLoaderThread=new Thread(jobLoaderFutureResult.setter(this.getCallableLoader( ! countDownLock,jobLoader,super.jobContext,pool)),jobLoaderName); jobLoaderThread.start(); this.jobProcessorsResultMap.put(jobLoaderName, jobLoaderFutureResult); this.jobProcessorsMap.put(jobLoaderName, jobLoader); ! this.jobStatistics=new JobStatistics(super.jobContext.getJobName()); this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); /** *************** *** 136,143 **** { PoolJobProcessor jobProcessor=(PoolJobProcessor)this.getInstance(poolJobControllerConfig.getPoolJobProcessorClassName()); ! String jobProcessorName=super.getJobName()+"_Processor_"+i; FutureResult jobProcessorFutureResult=new FutureResult(); Thread jobProcessorThread=new Thread(jobProcessorFutureResult.setter(this.getCallableProcessor( ! countDownLock,jobProcessor,poolJobControllerConfig.getPoolJobProcessorConfigProperties(),pool)),jobProcessorName); jobProcessorThread.start(); this.jobProcessorsResultMap.put(jobProcessorName, jobProcessorFutureResult); --- 137,144 ---- { PoolJobProcessor jobProcessor=(PoolJobProcessor)this.getInstance(poolJobControllerConfig.getPoolJobProcessorClassName()); ! String jobProcessorName=super.jobContext.getJobName()+"_Processor_"+i; FutureResult jobProcessorFutureResult=new FutureResult(); Thread jobProcessorThread=new Thread(jobProcessorFutureResult.setter(this.getCallableProcessor( ! countDownLock,jobProcessor,super.jobContext,pool)),jobProcessorName); jobProcessorThread.start(); this.jobProcessorsResultMap.put(jobProcessorName, jobProcessorFutureResult); *************** *** 148,152 **** */ ErrorCode returnCode=hybernate(countDownLock); ! logger.info("Exiting process in pool job controller = " + super.getJobName() + " with return code = " + returnCode); return returnCode; } --- 149,153 ---- */ ErrorCode returnCode=hybernate(countDownLock); ! logger.info("Exiting process in pool job controller = " + super.jobContext.getJobName() + " with return code = " + returnCode); return returnCode; } *************** *** 165,169 **** long expectedRecordsCount=0; ! String jobLoaderName=super.getJobName()+"_Loader"; PoolJobLoader jobLoader=(PoolJobLoader)this.jobProcessorsMap.get(jobLoaderName); expectedRecordsCount=jobLoader.getTotalJobDataCount(); --- 166,170 ---- long expectedRecordsCount=0; ! String jobLoaderName=super.jobContext.getJobName()+"_Loader"; PoolJobLoader jobLoader=(PoolJobLoader)this.jobProcessorsMap.get(jobLoaderName); expectedRecordsCount=jobLoader.getTotalJobDataCount(); *************** *** 415,419 **** * @return Returns the callabel interface wrapped around the job loader. */ ! private Callable getCallableLoader(final CountDown countDownLock,final PoolJobLoader jobLoader, final Map configProps, final JobPool pool) { logger.trace("Entering getCallableLoader"); --- 416,420 ---- * @return Returns the callabel interface wrapped around the job loader. */ ! private Callable getCallableLoader(final CountDown countDownLock,final PoolJobLoader jobLoader, final JobContext jobContext, final JobPool pool) { logger.trace("Entering getCallableLoader"); *************** *** 425,429 **** { logger.trace("Going to call the loadPool method"); ! returnCode=jobLoader.loadPool(configProps,pool); logger.debug("Done calling the loadPool method"); } --- 426,430 ---- { logger.trace("Going to call the loadPool method"); ! returnCode=jobLoader.loadPool(jobContext,pool); logger.debug("Done calling the loadPool method"); } *************** *** 454,458 **** * @return Returns the callabel interface wrapped around the job processor. */ ! private Callable getCallableProcessor(final CountDown countDownLock,final PoolJobProcessor jobProcessor, final Map configProps, final JobPool pool) { logger.trace("Entering getCallableProcessor"); --- 455,459 ---- * @return Returns the callabel interface wrapped around the job processor. */ ! private Callable getCallableProcessor(final CountDown countDownLock,final PoolJobProcessor jobProcessor, final JobContext jobContext, final JobPool pool) { logger.trace("Entering getCallableProcessor"); *************** *** 464,468 **** { logger.trace("Going to call the processPool method"); ! returnCode=jobProcessor.processPool(configProps,pool); logger.debug("Done calling the processPool method"); } --- 465,469 ---- { logger.trace("Going to call the processPool method"); ! returnCode=jobProcessor.processPool(jobContext,pool); logger.debug("Done calling the processPool method"); } Index: CollectionJobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/pool/CollectionJobPool.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** CollectionJobPool.java 13 Sep 2006 04:49:46 -0000 1.7 --- CollectionJobPool.java 13 Sep 2006 23:29:45 -0000 1.8 *************** *** 4,7 **** --- 4,9 ---- import java.util.Map; import org.apache.log4j.Logger; + import org.jmonks.batchserver.framework.JobContext; + import org.jmonks.batchserver.framework.config.PoolJobControllerConfig; /** *************** *** 128,134 **** * @param configProps Configuration defined for the job pool. */ ! public void initialize(Map configProps) { ! this.configProps=new Hashtable(configProps); /** * Try to get the pool size from configuration... If unable to get use default size. --- 130,137 ---- * @param configProps Configuration defined for the job pool. */ ! public void initialize(JobContext jobContext) { ! PoolJobControllerConfig poolJobControllerConfig=(PoolJobControllerConfig)jobContext.getJobConfig().getJobControllerConfig(); ! this.configProps=new Hashtable(poolJobControllerConfig.getPoolConfigProperties()); /** * Try to get the pool size from configuration... If unable to get use default size. |