Thread: [Batchserver-cvs] batchserver/src/org/jmonks/batchserver/framework/controller/basic BasicJobControll
Brought to you by:
suresh_pragada
From: Suresh <sur...@us...> - 2006-05-14 01:24:24
|
Update of /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv32376 Modified Files: BasicJobController.java BasicJobProcessor.java Log Message: no message Index: BasicJobProcessor.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobProcessor.java,v retrieving revision 1.7 retrieving revision 1.8 diff -C2 -d -r1.7 -r1.8 *** BasicJobProcessor.java 5 May 2006 21:06:57 -0000 1.7 --- BasicJobProcessor.java 14 May 2006 01:24:18 -0000 1.8 *************** *** 3,6 **** --- 3,7 ---- import org.apache.log4j.Logger; import org.jmonks.batchserver.framework.common.ErrorCode; + import org.jmonks.batchserver.framework.management.ProcessorStatus; /** *************** *** 24,27 **** --- 25,32 ---- */ private boolean threadRegistered=false; + /** + * Indicates the status of this processor. + */ + protected ProcessorStatus processorStatus=ProcessorStatus.INSTANTIATED; private static Logger logger=Logger.getLogger(BasicJobProcessor.class); *************** *** 64,69 **** public boolean suspend() { ! this.processorThread.suspend(); ! return true; } /** --- 69,80 ---- public boolean suspend() { ! if(this.processorStatus!=ProcessorStatus.SUSPENDED) ! { ! this.processorThread.suspend(); ! this.processorStatus=ProcessorStatus.SUSPENDED; ! return true; ! } ! else ! return false; } /** *************** *** 80,85 **** public boolean resume() { ! this.processorThread.resume(); ! return true; } /** --- 91,102 ---- public boolean resume() { ! if(this.processorStatus==ProcessorStatus.SUSPENDED) ! { ! this.processorThread.resume(); ! this.processorStatus=ProcessorStatus.RUNNING; ! return true; ! } ! else ! return false; } /** *************** *** 96,101 **** public boolean stop() { ! this.processorThread.stop(); ! return true; } --- 113,125 ---- public boolean stop() { ! if(this.processorStatus!=ProcessorStatus.STOPPED || ! this.processorStatus!=ProcessorStatus.FINISHED) ! { ! this.processorThread.stop(); ! this.processorStatus=ProcessorStatus.STOPPED; ! return true; ! } ! else ! return false; } *************** *** 134,146 **** /** ! * This is for monitoring applications to know the exact state of this particular ! * (if there are multiple job processors) job processor. This returns object * which can be used for the purpose of display. * ! * @return Returns the exact state this processor. */ public abstract Object getProcessorState(); /** * This is for monitoring applications to know how many records this particular * (if there are multiple job processors) job processor is going to process. --- 158,185 ---- /** ! * This is for monitoring applications to know what the information (record) ! * this job processor is processing. Implementers should provide the ! * granular level of information that this processor is processing at this time ! * for better monitoring of this job. This returns object * which can be used for the purpose of display. * ! * @return Returns the information (record) this processor is processing. */ public abstract Object getProcessorState(); /** + * Returns the status of the processor as a ProcessorStatus object, which gives + * the information like whether the processor is running, suspended, resumed + * or stopped. Defalt implementation of this method returns the status based + * on the default implementation of suspend, resume and stop methods. + * + * @return Returns the processor status. + */ + public ProcessorStatus getProcessorStatus() + { + return this.processorStatus; + } + + /** * This is for monitoring applications to know how many records this particular * (if there are multiple job processors) job processor is going to process. Index: BasicJobController.java =================================================================== RCS file: /cvsroot/batchserver/batchserver/src/org/jmonks/batchserver/framework/controller/basic/BasicJobController.java,v retrieving revision 1.8 retrieving revision 1.9 diff -C2 -d -r1.8 -r1.9 *** BasicJobController.java 5 May 2006 21:06:57 -0000 1.8 --- BasicJobController.java 14 May 2006 01:24:18 -0000 1.9 *************** *** 14,19 **** import org.jmonks.batchserver.framework.config.ConfigurationException; import org.jmonks.batchserver.framework.controller.JobController; ! import org.jmonks.batchserver.framework.management.JobStatus; ! import org.jmonks.batchserver.framework.management.ThreadState; /** --- 14,19 ---- import org.jmonks.batchserver.framework.config.ConfigurationException; import org.jmonks.batchserver.framework.controller.JobController; ! import org.jmonks.batchserver.framework.management.ProcessorStatus; ! import org.jmonks.batchserver.framework.management.ProcessorState; /** *************** *** 40,47 **** */ private Map jobProcessorsResultMap=new Hashtable(); - /** - * Represents the status of the job. - */ - private JobStatus jobStatus=null; private static Logger logger=Logger.getLogger(BasicJobController.class); --- 40,43 ---- *************** *** 67,71 **** { logger.info("Entering process in basic job controller = " + super.getJobName()); - this.jobStatus=JobStatus.RUNNING; BasicJobControllerConfig basicJobControllerConfig=(BasicJobControllerConfig)super.getJobControllerConfig(); int threadCount=basicJobControllerConfig.getBasicJobProcessThreadCount(); --- 63,66 ---- *************** *** 151,319 **** * Returns the IDs of all the processors as string array. * ! * @return Returns the string array consist of all the processor thread IDs. */ ! public String[] getThreadIDList() { ! logger.trace("Entering getThreadIDList"); ! String threadIDList[]=new String[this.jobProcessorsMap.size()]; int i=0; for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) ! threadIDList[i]=(String)iterator.next(); ! logger.trace("Exiting getThreadIDList"); ! return threadIDList; } /** ! * Returns the state of the requested thread as a ThreadState object. */ ! public ThreadState getThreadState(String threadID) { ! ThreadState state=null; ! if(threadID!=null && this.jobProcessorsMap.containsKey(threadID)) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(threadID); ! state=new ThreadState(threadID, "Basic Job Processor", jobProcessor.getProcessorState()); ! } else - { state=null; ! } ! logger.trace("Exiting getThreadState = " + state); return state; } /** ! * <p> ! * Returns the status of the job. Possible values are... <br> ! * <table> ! * <tr><td><b>Status</b></td></tr> ! * <tr><td>RUNNING</td></tr> ! * <tr><td>SUSPENDED</td></tr> ! * </p> */ ! public JobStatus getJobStatus() { ! return this.jobStatus; } /** ! * Stops the job by stopping all the job processors. */ ! public boolean stop() { ! logger.trace("Entering stop"); boolean stopped=true; ! ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! boolean processorStopped=jobProcessor.suspend(); ! /** ! * What should we do in case of job processor not being stopped. ! */ ! } ! ! if(stopped) ! this.jobStatus=JobStatus.STOPPED; ! logger.trace("Exiting stop = " + stopped); return stopped; } /** ! * Suspends the job by suspending all the job processors. If it couldnt suspend any one of the job processor, ! * it will rollback all the earlier suspends. * ! * @return Returns true if it could suspend the job, false otherwise. */ ! public boolean suspend() { ! logger.trace("Entering suspend"); boolean suspended=true; ! ! if(this.jobStatus==JobStatus.RUNNING) ! { ! List suspendedProcessorList=new ArrayList(); ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! boolean processorSuspended=jobProcessor.suspend(); ! if(processorSuspended) ! { ! suspendedProcessorList.add(jobProcessor); ! } ! else ! { ! /** ! * Add to the suspended list... in case to be rolled back rollbck everyone in the suspended list. ! */ ! logger.error("Unable to suspend one of the processor.. So rolling back the changes"); ! suspended=false; ! for(Iterator subIterator=suspendedProcessorList.iterator();subIterator.hasNext();) ! { ! ((BasicJobProcessor)subIterator.next()).resume(); ! } ! break; ! } ! } ! ! if(suspended) ! this.jobStatus=JobStatus.SUSPENDED; ! } else - { suspended=false; ! logger.info("Job current status is not RUNNING to suspend"); ! } ! logger.trace("Exiting suspend = " + suspended); return suspended; } /** ! * Resumes of the job processing by resuming all the job processors. * ! * @return Returns true if job can be resumed, false otherwise. */ ! public boolean resume() { ! logger.trace("Entering resume"); boolean resumed=true; ! ! if(this.jobStatus==JobStatus.SUSPENDED) ! { ! List resumedProcessorList=new ArrayList(); ! for(Iterator iterator=this.jobProcessorsMap.values().iterator();iterator.hasNext();) ! { ! BasicJobProcessor jobProcessor=(BasicJobProcessor)iterator.next(); ! boolean processorResumed=jobProcessor.suspend(); ! if(processorResumed) ! { ! resumedProcessorList.add(jobProcessor); ! } ! else ! { ! /** ! * Add to the resumed list... in case to be rolled back rollbck everyone in the resumed list. ! */ ! logger.error("Unable to resume one of the processor.. So rolling back the changes"); ! resumed=false; ! for(Iterator subIterator=resumedProcessorList.iterator();subIterator.hasNext();) ! { ! ((BasicJobProcessor)subIterator.next()).suspend(); ! } ! break; ! } ! } ! ! if(resumed) ! this.jobStatus=JobStatus.RUNNING; ! } else - { resumed=false; ! logger.error("Job is not in SUSPENDED status to resume"); ! } ! logger.trace("Exiting resume = " + resumed); return resumed; } --- 146,251 ---- * Returns the IDs of all the processors as string array. * ! * @return Returns the string array consist of all the processor IDs. */ ! public String[] getProcessorIDList() { ! logger.trace("Entering getProcessorIDList"); ! String processorIDList[]=new String[this.jobProcessorsMap.size()]; int i=0; for(Iterator iterator=this.jobProcessorsMap.keySet().iterator();iterator.hasNext();i++) ! processorIDList[i]=(String)iterator.next(); ! logger.trace("Exiting getProcessorIDList"); ! return processorIDList; } /** ! * Returns the state of the requested processor as a ProcessorState object. ! * ! * @return Returns the processor state. */ ! public ProcessorState getProcessorState(String processorID) { ! logger.trace("Exiting getProcessorState = " + processorID); ! ProcessorState state=null; ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); ! if(jobProcessor!=null) ! state=new ProcessorState(processorID, "Basic Job Processor", jobProcessor.getProcessorState()); else state=null; ! logger.trace("Exiting getProcessorState = " + processorID + " state = " + state); return state; } /** ! * Returns the status of the processor identified by the given processor ID. ! * ! * @return Returns the ProcessorStatus object represents the status. */ ! public ProcessorStatus getProcessorStatus(String processorID) { ! logger.trace("Exiting getProcessorStatus = " + processorID); ! ProcessorStatus status=null; ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); ! if(jobProcessor!=null) ! status=jobProcessor.getProcessorStatus(); ! else ! status=null; ! logger.trace("Exiting getProcessorStatus = " + processorID + " state = " + status); ! return status; } /** ! * Stops the processor identified by the given processor ID. ! * ! * @return Returns true, if processor could be stopped, false otherwise. */ ! public boolean stop(String processorID) { ! logger.trace("Entering stop = " + processorID); boolean stopped=true; ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); ! if(jobProcessor!=null) ! stopped=jobProcessor.stop(); ! else ! stopped=false; ! logger.trace("Exiting stop = " + processorID + " status = " + stopped); return stopped; } /** ! * Suspends the processor identified by the given processor ID. * ! * @return Returns true if it could suspend the processor, false otherwise. */ ! public boolean suspend(String processorID) { ! logger.trace("Entering suspend = " + processorID); boolean suspended=true; ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); ! if(jobProcessor!=null) ! suspended=jobProcessor.suspend(); else suspended=false; ! logger.trace("Exiting suspend = " + processorID + " suspended = " + suspended); return suspended; } /** ! * Resumes the processor identified by given processor ID. * ! * @return Returns true if processor is resumed, false otherwise. */ ! public boolean resume(String processorID) { ! logger.trace("Entering resume = " + processorID); boolean resumed=true; ! BasicJobProcessor jobProcessor=(BasicJobProcessor)this.jobProcessorsMap.get(processorID); ! if(jobProcessor!=null) ! resumed=jobProcessor.resume(); else resumed=false; ! logger.trace("Exiting resume = " + processorID + " resumed = " + resumed); return resumed; } |