Thread: [Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/basic BasicJobControll
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-05-04 22:26:29
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv6466/org/jmonks/batchserver/framework/controller/basic Modified Files: BasicJobController.java BasicJobProcessor.java Log Message: no message Index: BasicJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobProcessor.java,v retrieving revision 1.5 retrieving revision 1.6 diff -C2 -d -r1.5 -r1.6 *** BasicJobProcessor.java 3 May 2006 22:11:17 -0000 1.5 --- BasicJobProcessor.java 4 May 2006 22:26:26 -0000 1.6 *************** *** 5,32 **** /** * <p> ! * This is the basic job processor. This interface lets the developers to extend this interface and implement their logic. * </p> * ! * @author : Suresh Pragada * @version 1.0 */ ! public interface BasicJobProcessor { ! public boolean suspend(); ! ! public boolean resume(); ! ! public boolean stop(); ! ! public void initialize(Map configProps); ! ! public ErrorCode process(); ! ! public void cleanup(); ! ! public Object getProcessorState(); ! public int getTotalCount(); ! public int getProcessedCount(); } --- 5,119 ---- /** * <p> ! * BasicJobProcessor lets developers implement their business logic. * </p> * ! * @author Suresh Pragada * @version 1.0 + * @since 1.0 */ ! public abstract class BasicJobProcessor { ! /** ! * <p> ! * Basic job controller calls this method to suspend the job processor, when it receives ! * a <i>suspsend</i> message from the managemet interface. Default implementation ! * will suspend the job processor abruptly as soon as it receives the message. ! * Implementors can provide their own implementation by overriding this method ! * to provide better mechanism. ! * </p> ! * ! * @return Returns true if processor has suspended, false otherwise. ! */ ! public boolean suspend() ! { ! return true; ! } ! /** ! * <p> ! * Basic job controller calls this method to resume the job processor, when it receives ! * a <i>resume</i> message from the managemet interface. Default implementation ! * will resume the job processor abruptly as soon as it receives the message. ! * Implementors can provide their own implementation by overriding this method ! * to provide better mechanism. ! * </p> ! * ! * @return Returns true if processor has resumed, false otherwise. ! */ ! public boolean resume() ! { ! return true; ! } ! /** ! * <p> ! * Basic job controller calls this method to stop the job processor, when it receives ! * a <i>stop</i> message from the managemet interface. Default implementation ! * will stop the job processor abruptly as soon as it receives the message. ! * Implementors can provide their own implementation by overriding this method ! * to provide better mechanism. ! * </p> ! * ! * @return Returns true if processor has stopped, false otherwise. ! */ ! public boolean stop() ! { ! return true; ! } ! /** ! * <p> ! * This method gets a chance to initialize the job processor. This will be called ! * before the <i>process</i> method being called. Properties configured for ! * job processor in job configuration will be passed as map to this method. ! * If job processor needs any resources, they can initialize them here. If ! * mulitple job processors have been configured, this method will be called ! * for each processor in the order they will be created. ! * </p> ! * ! * @param configProps Properties defined for job processor in the job configuration. ! */ ! public abstract void initialize(Map configProps); ! /** ! * <p> ! * This method let developer implement businses logic and return the appropriate ! * error code. ! * </p> ! * ! * @return Returns the appropriate error code needs to be passed to the invocation layer. ! */ ! public abstract ErrorCode process(); ! ! /** ! * <p> ! * This method gets a chance to do any cleanup if needed, after the ! * <i>process</i> method finished. This will be called irrespective of the ! * return status(even exceptions occured) of the <i>process</i> method. ! * </p> ! */ ! public abstract void cleanup(); ! ! /** ! * This is for monitoring applications to know the exact state of this particular ! * (if there are multiple job processors) job processor. This can return any object ! * which can be converted to display. ! * ! * @return Returns the exact state this processor. ! */ ! public abstract Object getProcessorState(); ! ! /** ! * This is for monitoring applications to know how many records this particular ! * (if there are multiple job processors) job processor is going to process. ! * ! * @return Returns the number of records/jobs this processor is going to process. ! */ ! public abstract int getTotalCount(); ! ! /** ! * This is for monitoring applications to know how many records this particular ! * (if there are multiple job processors) job processor has finished processing. ! * ! * @return Returns the number of records/jobs this processor has finished processing. ! */ ! public abstract int getProcessedCount(); } Index: BasicJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobController.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** BasicJobController.java 3 May 2006 22:11:17 -0000 1.6 --- BasicJobController.java 4 May 2006 22:26:26 -0000 1.7 *************** *** 1,4 **** --- 1,6 ---- package org.jmonks.batchserver.framework.controller.basic; + import EDU.oswego.cs.dl.util.concurrent.Callable; import EDU.oswego.cs.dl.util.concurrent.CountDown; + import EDU.oswego.cs.dl.util.concurrent.FutureResult; import java.util.HashMap; import java.util.Hashtable; *************** *** 15,19 **** /** * <p> ! * This is the basic implementation of Job Controller. * </p> * --- 17,23 ---- /** * <p> ! * Basic Job Controller creates the configured number of basic job processor ! * instances and have them run in a seperate thread. Once all the processors have ! * finished their processing, returns the appropriate return code. * </p> * *************** *** 56,59 **** --- 60,64 ---- public ErrorCode process() { + logger.info("Entering process in basic job controller = " + super.getJobName()); BasicJobControllerConfig basicJobControllerConfig=(BasicJobControllerConfig)super.getJobControllerConfig(); int threadCount=basicJobControllerConfig.getBasicJobProcessThreadCount(); *************** *** 63,68 **** --- 68,75 ---- threadCount=threadCount>0?threadCount:0; CountDown countDownLock=new CountDown(threadCount); + logger.debug("Going to create " + threadCount + " basic job processor(s)"); for(int i=0;i<threadCount;i++) { + String threadID=super.getJobName()+"_"+(i+1); BasicJobProcessor jobProcessor=this.getBasicJobProcessor(basicJobControllerConfig.getBasicJobProcessorClassName()); /** *************** *** 72,88 **** try { jobProcessor.initialize(processorConfigProperties); } catch(Throwable exception) { exception.printStackTrace(); ! logger.error(exception.getMessage(), exception); } ! String threadID=super.getJobName()+"_"+(i+1); ! Thread thread=new Thread(this.getRunnableProcessor(countDownLock,jobProcessor), threadID); ! thread.start(); this.jobProcessorsMap.put(threadID,jobProcessor); } ! return hybernate(countDownLock); } --- 79,102 ---- try { + logger.debug("Going to initialize the " + threadID + " basic job processor"); jobProcessor.initialize(processorConfigProperties); + logger.info("Done initializing the " + threadID + " basic job processor"); } catch(Throwable exception) { exception.printStackTrace(); ! logger.error("Exception while initializing the " + threadID + " basic job processor = " + ! exception.getMessage(), exception); } ! FutureResult result=new FutureResult(); ! Thread processorThread=new Thread(result.setter(this.getCallableProcessor(countDownLock,jobProcessor)), threadID); ! processorThread.start(); this.jobProcessorsMap.put(threadID,jobProcessor); + this.jobProcessorsResultMap.put(threadID, result); + logger.info(threadID + " basic job processor has been kicked off."); } ! ErrorCode returnCode=hybernate(countDownLock); ! logger.info("Exiting process in basic job controller = " + super.getJobName() + " with return code = " + returnCode); ! return returnCode; } *************** *** 165,168 **** --- 179,183 ---- private BasicJobProcessor getBasicJobProcessor(String basicJobProcessorClassName) { + logger.trace("Entering getBasicJobProcessor = " + basicJobProcessorClassName); BasicJobProcessor jobProcessor=null; try *************** *** 188,191 **** --- 203,207 ---- throw new ConfigurationException(ConfigurationException.JOB_CONTROLLER_CONFIG, exception.getMessage()); } + logger.trace("Exiting getBasicJobProcessor"); return jobProcessor; } *************** *** 203,257 **** * @return Returns the runnable instance. */ ! private Runnable getRunnableProcessor(final CountDown countDownLock,final BasicJobProcessor jobProcessor) { ! Runnable runnable=new Runnable(){ ! public void run() { ! /** ! * Call the process method on basic job processor and report the er. ! */ try { ! ErrorCode errorCode=jobProcessor.process(); ! done(Thread.currentThread().getName(),errorCode); } catch(Throwable exception) { exception.printStackTrace(); ! logger.error(exception.getMessage(), exception); ! done(Thread.currentThread().getName(), ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION); } ! /** ! * Call the cleanup method on basic job processor. ! */ try { jobProcessor.cleanup(); } catch(Throwable exception) { exception.printStackTrace(); ! logger.error(exception.getMessage(), exception); } countDownLock.release(); } }; ! return runnable; } - - /** - * <p> - * Once all the processors have done their task, they will report here with the - * errorcode. This API will registers the errorcode with the thread name and - * wakes up the main thread if this is the last processor. - * </p> - * @param threadID Name of thread done and reporting. - * @param errorCode Processor return code. - */ - private void done(String threadID,ErrorCode errorCode) - { - this.jobProcessorsResultMap.put(threadID, errorCode); - logger.info(threadID + " has finished processing with the error code = " + errorCode); - } /** --- 219,261 ---- * @return Returns the runnable instance. */ ! private Callable getCallableProcessor(final CountDown countDownLock,final BasicJobProcessor jobProcessor) { ! logger.trace("Entering getCallableProcessor"); ! Callable callable=new Callable(){ ! public Object call() { ! ErrorCode returnCode=null; try { ! logger.trace("Going to call the process method"); ! returnCode=jobProcessor.process(); ! logger.debug("Done calling the process method"); } catch(Throwable exception) { exception.printStackTrace(); ! logger.error("Exception while processing = " + exception.getMessage(), exception); ! returnCode=ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } ! // Need to call the processor cleanup all the time. try { + logger.trace("Going to call cleanup method"); jobProcessor.cleanup(); + logger.debug("Done calling the cleanup method"); } catch(Throwable exception) { exception.printStackTrace(); ! logger.error("Exception while doing the cleanup = " + exception.getMessage(), exception); } countDownLock.release(); + logger.info(Thread.currentThread().getName() + " is exiting with the error code = " + returnCode); + return returnCode; } }; ! logger.trace("Exiting getCallableProcessor"); ! return callable; } /** *************** *** 267,287 **** private ErrorCode hybernate(CountDown countDownLock) { try { countDownLock.acquire(); } catch(InterruptedException exception) { exception.printStackTrace(); ! logger.error(exception.getMessage(), exception); } for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) { ! ErrorCode returnCode=(ErrorCode)iterator.next(); ! if(returnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) ! return returnCode; } ! return ErrorCode.JOB_COMPLETED_SUCCESSFULLY; } } --- 271,300 ---- private ErrorCode hybernate(CountDown countDownLock) { + logger.trace("Entering hybernate"); + ErrorCode returnCode=ErrorCode.JOB_COMPLETED_SUCCESSFULLY; try { + logger.debug("Going to wait until all the processors is gonna finish."); countDownLock.acquire(); + logger.info("All processors have finished their task."); } catch(InterruptedException exception) { exception.printStackTrace(); ! logger.error("Exception while waiting for all the processors = " + exception.getMessage(), exception); ! return ErrorCode.BASIC_JOB_PROCESSOR_EXCEPTION; } for(Iterator iterator=this.jobProcessorsResultMap.values().iterator();iterator.hasNext();) { ! ErrorCode threadReturnCode=(ErrorCode)((FutureResult)iterator.next()).peek(); ! if(threadReturnCode!=ErrorCode.JOB_COMPLETED_SUCCESSFULLY) ! { ! returnCode=threadReturnCode; ! break; ! } } ! logger.trace("Exiting hybernate = " + returnCode); ! return returnCode; } } |