[Batchserver-cvs] batchserver/src/org/jmonks/batch/framework/controller/pool AbstractPoolJobProces
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-09-19 00:05:57
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batch/framework/controller/pool In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv27472/framework/controller/pool Modified Files: AbstractPoolJobProcessor.java CollectionJobPool.java PoolJobController.java Log Message: no message Index: AbstractPoolJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batch/framework/controller/pool/AbstractPoolJobProcessor.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** AbstractPoolJobProcessor.java 15 Sep 2006 20:06:49 -0000 1.1 --- AbstractPoolJobProcessor.java 19 Sep 2006 00:05:52 -0000 1.2 *************** *** 84,88 **** * and passes that information to the processor implementation for * processing and cleans up the processor implementation by calling the <i>cleanup</i> ! * method. * </p> * --- 84,91 ---- * and passes that information to the processor implementation for * processing and cleans up the processor implementation by calling the <i>cleanup</i> ! * method. It consolidates all the return codes from the processor implementation ! * process method and return the final error code. If it receives null as the ! * return code, continues the processing, but returns pool job processor exception ! * return code. * </p> * *************** *** 90,94 **** * @param pool Job pool reference where job data needs to be pulled. * ! * @return Returns the status code of this processor. */ public ErrorCode processPool(JobContext jobContext, JobPool pool) --- 93,97 ---- * @param pool Job pool reference where job data needs to be pulled. * ! * @return Returns the status code of this job processor. */ public ErrorCode processPool(JobContext jobContext, JobPool pool) *************** *** 121,125 **** this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); ! returnCode=errorCode; } catch(Throwable exception) --- 124,132 ---- this.processedJobDataCount++; ErrorCode errorCode=this.process(jobData); ! ! if(errorCode==null) ! returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION.appendMessage("Received the null returnCode from " + Thread.currentThread().getName() + " process."); ! else if (!errorCode.equals(ErrorCode.JOB_COMPLETED_SUCCESSFULLY)) ! returnCode=errorCode; } catch(Throwable exception) *************** *** 167,171 **** logger.trace("Exiting processPool"); ! return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } --- 174,178 ---- logger.trace("Exiting processPool"); ! return returnCode; } *************** *** 256,260 **** * @param jobData Data to be processed. * ! * @return Returns the status of the processingn of this jobData. */ public abstract ErrorCode process(Object jobData); --- 263,267 ---- * @param jobData Data to be processed. * ! * @return Returns the status of the processing of this jobData. */ public abstract ErrorCode process(Object jobData); Index: PoolJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batch/framework/controller/pool/PoolJobController.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** PoolJobController.java 15 Sep 2006 20:06:49 -0000 1.1 --- PoolJobController.java 19 Sep 2006 00:05:52 -0000 1.2 *************** *** 3,6 **** --- 3,7 ---- import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; + import java.lang.reflect.InvocationTargetException; import java.util.Calendar; import java.util.Hashtable; *************** *** 141,144 **** --- 142,146 ---- this.jobStatistics=new JobStatistics(super.jobContext.getJobName()); this.jobStatistics.setStartTime(Calendar.getInstance().getTime()); + this.jobStatistics.setMaxMemeoryUsage(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()); /** * Create, initialize and spawn the processor(s). *************** *** 385,391 **** try { ! logger.debug("Going to wait until loader and all the processor(s) is gonna finish."); countDownLock.acquire(); logger.info("Loader and all processor(s) have finished their task."); /** As per the contract cleanup the pool. */ this.pool.cleanup(); --- 387,405 ---- try { ! logger.debug("Going to wait until loader and all the processor(s) going to finish."); countDownLock.acquire(); logger.info("Loader and all processor(s) have finished their task."); + + logger.debug("Calculating the final return code from all the processors/loader results."); + for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) + { + ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).get(); + if(!ErrorCode.JOB_COMPLETED_SUCCESSFULLY.equals(threadReturnCode) && threadReturnCode.getCode()>returnCode.getCode()) + { + returnCode=threadReturnCode; + } + } + logger.debug("Calculated result from all the processeors/loader is " + returnCode); + /** As per the contract cleanup the pool. */ this.pool.cleanup(); *************** *** 397,414 **** return ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } this.jobStatistics.setEndTime(Calendar.getInstance().getTime()); ! this.jobStatistics.setMaxMemeoryUsage(Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory()); this.jobStatistics.setRecordsProcessed(this.getProcessedRecordsCount()); - - for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) - { - ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); - if(!ErrorCode.JOB_COMPLETED_SUCCESSFULLY.equals(threadReturnCode)) - { - returnCode=threadReturnCode; - break; - } - } this.jobStatistics.setExitCode(returnCode); logger.trace("Exiting hybernate = " + returnCode); --- 411,426 ---- return ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } + catch(InvocationTargetException exception) + { + exception.printStackTrace(); + logger.error("Exception while waiting for loader and all the processors = " + exception.getMessage(), exception); + return ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; + } this.jobStatistics.setEndTime(Calendar.getInstance().getTime()); ! this.jobStatistics.setMaxMemeoryUsage( ! (Runtime.getRuntime().totalMemory()-Runtime.getRuntime().freeMemory())-this.jobStatistics.getMaxMemoryUsage() ! ); this.jobStatistics.setRecordsProcessed(this.getProcessedRecordsCount()); this.jobStatistics.setExitCode(returnCode); logger.trace("Exiting hybernate = " + returnCode); *************** *** 438,442 **** logger.trace("Going to call the loadPool method"); returnCode=jobLoader.loadPool(jobContext,pool); ! logger.debug("Done calling the loadPool method"); } catch(Throwable exception) --- 450,454 ---- logger.trace("Going to call the loadPool method"); returnCode=jobLoader.loadPool(jobContext,pool); ! logger.debug("Done calling the loadPool method and return code = " + returnCode); } catch(Throwable exception) *************** *** 444,449 **** exception.printStackTrace(); logger.error("Exception while loading the job data into the pool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION; } countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); --- 456,462 ---- exception.printStackTrace(); logger.error("Exception while loading the job data into the pool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_LOADER_EXCEPTION.appendMessage(" Message from loader = " + exception.getMessage()); } + logger.trace(Thread.currentThread().getName() + " is going to release the countdown lock."); countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); *************** *** 477,481 **** logger.trace("Going to call the processPool method"); returnCode=jobProcessor.processPool(jobContext,pool); ! logger.debug("Done calling the processPool method"); } catch(Throwable exception) --- 490,494 ---- logger.trace("Going to call the processPool method"); returnCode=jobProcessor.processPool(jobContext,pool); ! logger.debug("Done calling the processPool method and return code = " + returnCode); } catch(Throwable exception) *************** *** 483,488 **** exception.printStackTrace(); logger.error("Exception while processing the job data from the pool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION; } countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); --- 496,502 ---- exception.printStackTrace(); logger.error("Exception while processing the job data from the pool = " + exception.getMessage(), exception); ! returnCode=ErrorCode.POOL_JOB_PROCESSOR_EXCEPTION.appendMessage(" Message from processor = " + exception.getMessage()); } + logger.trace(Thread.currentThread().getName() + " is going to release the countdown lock."); countDownLock.release(); logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); *************** *** 552,556 **** Object jobProcessor=this.getInstance(controllerConfig.getPoolJobProcessorClassName()); ! if(jobProcessor instanceof PoolJobLoader) { logger.debug("Job Processor is configured properly = " + controllerConfig.getPoolJobProcessorClassName()); --- 566,570 ---- Object jobProcessor=this.getInstance(controllerConfig.getPoolJobProcessorClassName()); ! if(jobProcessor instanceof PoolJobProcessor) { logger.debug("Job Processor is configured properly = " + controllerConfig.getPoolJobProcessorClassName()); Index: CollectionJobPool.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batch/framework/controller/pool/CollectionJobPool.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** CollectionJobPool.java 15 Sep 2006 20:06:49 -0000 1.1 --- CollectionJobPool.java 19 Sep 2006 00:05:52 -0000 1.2 *************** *** 152,161 **** catch(Exception exception) { ! logger.info("Exception while obtaining the pool size from configuration... Using the default pool size"); } } else { ! logger.info("pool size has not been configured.. using the default pool size"); } this.pool=new BoundedBuffer(poolSize); --- 152,163 ---- catch(Exception exception) { ! logger.info("Exception while obtaining the pool size from configuration... Using the default pool size = " ! + CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE); } } else { ! logger.info("pool size has not been configured.. using the default pool size = " ! + CollectionJobPool.DEFAULT_COLLECTION_POOL_SIZE); } this.pool=new BoundedBuffer(poolSize); |