[Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/basic BasicJobControll
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-05-05 21:07:06
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv24933/org/jmonks/batchserver/framework/controller/basic Modified Files: BasicJobController.java BasicJobProcessor.java Log Message: no message Index: BasicJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobProcessor.java,v retrieving revision 1.6 retrieving revision 1.7 diff -C2 -d -r1.6 -r1.7 *** BasicJobProcessor.java 4 May 2006 22:26:26 -0000 1.6 --- BasicJobProcessor.java 5 May 2006 21:06:57 -0000 1.7 *************** *** 1,4 **** --- 1,5 ---- package org.jmonks.batchserver.framework.controller.basic; import java.util.Map; + import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; *************** *** 15,18 **** --- 16,55 ---- { /** + * Holds the thread references this processor has been spawned. + * This reference will be used to suspend, resume and stop the processor. + */ + protected Thread processorThread=null; + /** + * Indicates that thread has been registered or not. + */ + private boolean threadRegistered=false; + + private static Logger logger=Logger.getLogger(BasicJobProcessor.class); + + /** + * Gets the thread reference this processor is going to be executed on. + * If processorThread has already initialized, it ignores this request. + * + * @return Returns true if thread has been registered with the processor, false otherwise. + */ + final boolean registerThread() + { + logger.trace("Entering registerThread"); + boolean registered=true; + if(!threadRegistered) + { + this.processorThread=Thread.currentThread(); + logger.debug("Registering the thread = " + this.processorThread.getName()); + this.threadRegistered=true; + } + else + { + registered=false; + } + logger.trace("Exiting registerThread"); + return registered; + } + + /** * <p> * Basic job controller calls this method to suspend the job processor, when it receives *************** *** 27,30 **** --- 64,68 ---- public boolean suspend() { + this.processorThread.suspend(); return true; } *************** *** 42,45 **** --- 80,84 ---- public boolean resume() { + this.processorThread.resume(); return true; } *************** *** 57,60 **** --- 96,100 ---- public boolean stop() { + this.processorThread.stop(); return true; } *************** *** 95,100 **** /** * This is for monitoring applications to know the exact state of this particular ! * (if there are multiple job processors) job processor. This can return any object ! * which can be converted to display. * * @return Returns the exact state this processor. --- 135,140 ---- /** * This is for monitoring applications to know the exact state of this particular ! * (if there are multiple job processors) job processor. This returns object ! * which can be used for the purpose of display. * * @return Returns the exact state this processor. *************** *** 108,112 **** * @return Returns the number of records/jobs this processor is going to process. */ ! public abstract int getTotalCount(); /** --- 148,152 ---- * @return Returns the number of records/jobs this processor is going to process. */ ! public abstract int getTotalRecordsCount(); /** *************** *** 116,119 **** * @return Returns the number of records/jobs this processor has finished processing. */ ! public abstract int getProcessedCount(); } --- 156,159 ---- * @return Returns the number of records/jobs this processor has finished processing. */ ! public abstract int getProcessedRecordsCount(); } Index: BasicJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobController.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** BasicJobController.java 4 May 2006 22:26:26 -0000 1.7 --- BasicJobController.java 5 May 2006 21:06:57 -0000 1.8 *************** *** 3,9 **** --- 3,11 ---- import EDU.oswego.cs.dl.util.concurrent.CountDown; import EDU.oswego.cs.dl.util.concurrent.FutureResult; + import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; import java.util.Iterator; + import java.util.List; import java.util.Map; import org.apache.log4j.Logger; *************** *** 38,41 **** --- 40,47 ---- */ private Map jobProcessorsResultMap=new Hashtable(); + /** + * Represents the status of the job. + */ + private JobStatus jobStatus=null; private static Logger logger=Logger.getLogger(BasicJobController.class); *************** *** 61,64 **** --- 67,71 ---- { logger.info("Entering process in basic job controller = " + super.getJobName()); + this.jobStatus=JobStatus.RUNNING; BasicJobControllerConfig basicJobControllerConfig=(BasicJobControllerConfig)super.getJobControllerConfig(); int threadCount=basicJobControllerConfig.getBasicJobProcessThreadCount(); *************** *** 102,106 **** /** ! * Returns the total number of records. * * @return Returns the total number of records the job going to process. --- 109,114 ---- /** ! * Returns the total number of records this job going to process. This will be ! * the sum of the expected count from all the job processors. * * @return Returns the total number of records the job going to process. *************** *** 108,168 **** public int getExpectedRecordsCount() { ! return 0; } /** ! * Returns the number of records processed so far. */ public int getProcessedRecordsCount() { ! return 0; } /** ! * Returns the thread count. */ public String[] getThreadIDList() { ! return null; } /** ! * Returns the state of the thread as a ThreadState object. */ public ThreadState getThreadState(String threadID) { ! return null; } /** ! * Returns the job status. */ public JobStatus getJobStatus() { ! return null; } /** ! * Stops the job. */ public boolean stop() { ! return true; } /** ! * Suspends the job. */ public boolean suspend() { ! return true; } /** ! * Resume the job */ public boolean resume() { ! return true; } --- 116,320 ---- public int getExpectedRecordsCount() { ! logger.trace("Entering getExpectedRecordsCount"); ! int expectedRecordsCount=0; ! ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! expectedRecordsCount=expectedRecordsCount+jobProcessor.getTotalRecordsCount(); ! } ! logger.trace("Entering getExpectedRecordsCount = " + expectedRecordsCount); ! return expectedRecordsCount; } /** ! * Returns the number of records processed so far. This will be ! * the sum of the processed count from all the job processors. ! * ! * @return Returns the count of processed records. */ public int getProcessedRecordsCount() { ! logger.trace("Entering getProcessedRecordsCount"); ! int processedRecordsCount=0; ! ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! processedRecordsCount=processedRecordsCount+jobProcessor.getProcessedRecordsCount(); ! } ! logger.trace("Entering getProcessedRecordsCount = " + processedRecordsCount); ! return processedRecordsCount; } /** ! * Returns the IDs of all the processors as string array. ! * ! * @return Returns the string array consist of all the processor thread IDs. */ public String[] getThreadIDList() { ! logger.trace("Entering getThreadIDList"); ! ! String threadIDList[]=new String[this.jobProcessorsMap.size()]; ! int i=0; ! for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) ! threadIDList[i]=(String)iterator.next(); ! ! logger.trace("Exiting getThreadIDList"); ! return threadIDList; } /** ! * Returns the state of the requested thread as a ThreadState object. */ public ThreadState getThreadState(String threadID) { ! ThreadState state=null; ! if(threadID!=null && this.jobProcessorsMap.containsKey(threadID)) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(threadID); ! state=new ThreadState(threadID, "Basic Job Processor", jobProcessor.getProcessorState()); ! } ! else ! { ! state=null; ! } ! logger.trace("Exiting getThreadState = " + state); ! return state; } /** ! * <p> ! * Returns the status of the job. Possible values are... <br> ! * <table> ! * <tr><td><b>Status</b></td></tr> ! * <tr><td>RUNNING</td></tr> ! * <tr><td>SUSPENDED</td></tr> ! * </p> */ public JobStatus getJobStatus() { ! return this.jobStatus; } /** ! * Stops the job by stopping all the job processors. */ public boolean stop() { ! logger.trace("Entering stop"); ! boolean stopped=true; ! ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! boolean processorStopped=jobProcessor.suspend(); ! /** ! * What should we do in case of job processor not being stopped. ! */ ! } ! ! if(stopped) ! this.jobStatus=JobStatus.STOPPED; ! logger.trace("Exiting stop = " + stopped); ! return stopped; } /** ! * Suspends the job by suspending all the job processors. If it couldnt suspend any one of the job processor, ! * it will rollback all the earlier suspends. ! * ! * @return Returns true if it could suspend the job, false otherwise. */ public boolean suspend() { ! logger.trace("Entering suspend"); ! boolean suspended=true; ! ! if(this.jobStatus==JobStatus.RUNNING) ! { ! List suspendedProcessorList=new ArrayList(); ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! boolean processorSuspended=jobProcessor.suspend(); ! if(processorSuspended) ! { ! suspendedProcessorList.add(jobProcessor); ! } ! else ! { ! /** ! * Add to the suspended list... in case to be rolled back rollbck everyone in the suspended list. ! */ ! logger.error("Unable to suspend one of the processor.. So rolling back the changes"); ! suspended=false; ! for(Iterator subIterator=suspendedProcessorList.iterator();subIterator.hasNext();) ! { ! ((BasicJobProcessor)subIterator.next()).resume(); ! } ! break; ! } ! } ! ! if(suspended) ! this.jobStatus=JobStatus.SUSPENDED; ! } ! else ! { ! suspended=false; ! logger.info("Job current status is not RUNNING to suspend"); ! } ! logger.trace("Exiting suspend = " + suspended); ! return suspended; } /** ! * Resumes of the job processing by resuming all the job processors. ! * ! * @return Returns true if job can be resumed, false otherwise. */ public boolean resume() { ! logger.trace("Entering resume"); ! boolean resumed=true; ! ! if(this.jobStatus==JobStatus.SUSPENDED) ! { ! List resumedProcessorList=new ArrayList(); ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! boolean processorResumed=jobProcessor.suspend(); ! if(processorResumed) ! { ! resumedProcessorList.add(jobProcessor); ! } ! else ! { ! /** ! * Add to the resumed list... in case to be rolled back rollbck everyone in the resumed list. ! */ ! logger.error("Unable to resume one of the processor.. So rolling back the changes"); ! resumed=false; ! for(Iterator subIterator=resumedProcessorList.iterator();subIterator.hasNext();) ! { ! ((BasicJobProcessor)subIterator.next()).suspend(); ! } ! break; ! } ! } ! ! if(resumed) ! this.jobStatus=JobStatus.RUNNING; ! } ! else ! { ! resumed=false; ! logger.error("Job is not in SUSPENDED status to resume"); ! } ! logger.trace("Exiting resume = " + resumed); ! return resumed; } *************** *** 228,231 **** --- 380,385 ---- try { + boolean registered=jobProcessor.registerThread(); + logger.debug("Status of registering thread with the processor = " + registered); logger.trace("Going to call the process method"); returnCode=jobProcessor.process(); |