From: <ian...@us...> - 2011-04-28 10:34:44
|
Revision: 13775 http://gate.svn.sourceforge.net/gate/?rev=13775&view=rev Author: ian_roberts Date: 2011-04-28 10:34:38 +0000 (Thu, 28 Apr 2011) Log Message: ----------- Added a true-by-default flag telling the BatchRunner to do a hard System.exit when it has no more batches to process. This is a quick fix for a problem where certain PRs and LRs (notably the OWLIM Ontology) can leave idle non-daemon background threads when the batch completes, preventing the GCP process from exiting. The PRs making up the application are all properly disposed at the end of the batch but the other PRs and/or LRs they depend on may not be. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2011-04-28 01:14:39 UTC (rev 13774) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2011-04-28 10:34:38 UTC (rev 13775) @@ -271,9 +271,10 @@ /** * A thread that runs continuously while the batch runner is active. Its role * is to monitor the running jobs, collect process results, save the report - * files for each running batch, and shutdown the batch runner when all the - * batches have completed (if requested via the - * {@link BatchRunner#shutdownWhenFinished(boolean)} method). + * files for each running batch, and shutdown the batch runner and/or Java + * process when all the batches have completed (if requested via the + * {@link BatchRunner#shutdownWhenFinished(boolean)} and + * {@link BatchRunner#exitWhenFinished(boolean)} methods). */ private class JobMonitor implements Runnable { public void run() { @@ -331,9 +332,14 @@ } } // if all jobs finished and we should shutdown, then let's shutdown - if(shutdownWhenFinished && !jobsStillRunning) { - shutdown(); - finished = true; + if(!jobsStillRunning) { + if(shutdownWhenFinished) { + shutdown(); + finished = true; + } + if(exitWhenFinished) { + System.exit(0); + } } long remainingSleepTime = LOOP_WAIT - (System.currentTimeMillis() - startTime); @@ -373,6 +379,12 @@ } } + public void exitWhenFinished(boolean flag) { + synchronized(this) { + this.exitWhenFinished = flag; + } + } + /** * Stops this batch runner in an orderly fashion. */ @@ -406,6 +418,11 @@ * currently running batches have completed. */ private boolean shutdownWhenFinished = true; + /** + * A flag used to signal that the batch runner should exit the Java process + * when all currently running batches have completed. + */ + private boolean exitWhenFinished = true; /** * Starts executing the batch task specified by the provided parameter. @@ -536,6 +553,7 @@ log.info("Launching batch:\n" + aBatch); instance.runBatch(aBatch); instance.shutdownWhenFinished(true); + instance.exitWhenFinished(true); } catch(Exception e) { log.error("Error starting up batch " + batchFile, e); System.exit(1); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ian...@us...> - 2011-06-09 16:32:56
|
Revision: 13991 http://gate.svn.sourceforge.net/gate/?rev=13991&view=rev Author: ian_roberts Date: 2011-06-09 16:32:50 +0000 (Thu, 09 Jun 2011) Log Message: ----------- Tidying up imports and deleting an unused field. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2011-06-09 16:32:05 UTC (rev 13990) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2011-06-09 16:32:50 UTC (rev 13991) @@ -36,14 +36,9 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import javax.management.InstanceAlreadyExistsException; import javax.management.JMException; -import javax.management.MBeanRegistrationException; -import javax.management.MalformedObjectNameException; -import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; -import javax.xml.stream.XMLOutputFactory; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; @@ -56,8 +51,6 @@ */ public class BatchRunner { private static final Logger log = Logger.getLogger(BatchRunner.class); - private static final XMLOutputFactory staxFactory = XMLOutputFactory - .newInstance(); //private static final long LOOP_WAIT = 5 * 60 * 1000; private static final long LOOP_WAIT = 10 * 1000; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-04 18:04:16
|
Revision: 18157 http://sourceforge.net/p/gate/code/18157 Author: johann_p Date: 2014-07-04 18:04:12 +0000 (Fri, 04 Jul 2014) Log Message: ----------- Make sure the correct gate config, user config and session files are always used, no matter if we use the GATE from GCP or the one set from the gcp-direct.sh script Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-04 17:48:57 UTC (rev 18156) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-04 18:04:12 UTC (rev 18157) @@ -566,15 +566,20 @@ int numThreads = Integer.parseInt(args[0]); String gateDotHome = System.getProperty("gate.home.override"); - File gateHome = null; - if(gateDotHome == null) { - gateHome = new File(gcpHome,"gate-home"); - } else { + File gcpGate = new File(gcpHome,"gate-home"); + File gateHome = gcpGate; + if(gateDotHome != null) { gateHome = new File(gateDotHome); } Gate.setGateHome(gateHome); - Gate.setUserConfigFile(new File(gcpHome, "user-gate.xml")); - Gate.setUserSessionFile(new File(gcpHome, "empty.session")); + if(gateDotHome != null) { // if we use our own GATE + Gate.setSiteConfigFile(new File(gcpGate,"gate.xml")); + Gate.setUserConfigFile(new File(gcpGate, "user-gate.xml")); + Gate.setUserSessionFile(new File(gcpGate, "empty.session")); + } else { // we use the GCP GATE + Gate.setUserConfigFile(new File(gateHome, "user-gate.xml")); + Gate.setUserSessionFile(new File(gateHome, "empty.session")); + } Gate.init(); BatchRunner instance = new BatchRunner(numThreads); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-07 18:47:01
|
Revision: 18163 http://sourceforge.net/p/gate/code/18163 Author: johann_p Date: 2014-07-07 18:46:57 +0000 (Mon, 07 Jul 2014) Log Message: ----------- Add some information about total and used memory before and after loading the application to the log. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-07 18:36:36 UTC (rev 18162) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-07 18:46:57 UTC (rev 18163) @@ -643,7 +643,12 @@ } } }); - + + final int MB = 1024*1024; + Runtime runtime = Runtime.getRuntime(); + log.info("Initial total allocated memory: "+(runtime.totalMemory()/MB)+"M"); + log.info("Initial used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); + try { // we set gate home differently depending on which invocation mode // we have: if we got invoked from gcpcli, we want to use the gate @@ -772,7 +777,8 @@ aBatch.init(); } } - + log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); + log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); log.info("Launching batch:\n" + aBatch); instance.runBatch(aBatch); instance.shutdownWhenFinished(true); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-08 15:06:12
|
Revision: 18165 http://sourceforge.net/p/gate/code/18165 Author: johann_p Date: 2014-07-08 15:06:08 +0000 (Tue, 08 Jul 2014) Log Message: ----------- Add some information about loading and processing time to the log Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-07 18:57:06 UTC (rev 18164) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-08 15:06:08 UTC (rev 18165) @@ -407,6 +407,10 @@ return; } } + long processingFinishedTime = System.currentTimeMillis(); + log.info("Processing time (seconds): "+(processingFinishedTime-loadingFinishedTime)/1000.0); + log.info("Total time (seconds): "+(processingFinishedTime-startTime)/1000.0); + } /** @@ -506,6 +510,9 @@ public Set<String> getBatchJobIDs() { return new HashSet<String>(runningJobs.keySet()); } + + static long startTime = System.currentTimeMillis(); + static long loadingFinishedTime; /** * Main entry point. This can be invoked in one of two ways: the "legacy" @@ -525,7 +532,6 @@ // The GCP-CLI way to invoke is "nthreads configfile" with no // option flags while the command line invokation always includes // requried option flags. - // Options for the command line invokation // TODO: may be useful to be able to override the default user config and // session files here? @@ -779,7 +785,10 @@ } log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); + loadingFinishedTime = System.currentTimeMillis(); + log.info("Loading time (seconds): "+(loadingFinishedTime-startTime)/1000.0); log.info("Launching batch:\n" + aBatch); + instance.runBatch(aBatch); instance.shutdownWhenFinished(true); instance.exitWhenFinished(true); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-08 16:49:14
|
Revision: 18166 http://sourceforge.net/p/gate/code/18166 Author: johann_p Date: 2014-07-08 16:49:07 +0000 (Tue, 08 Jul 2014) Log Message: ----------- Remove some superfluous info messages from the output when running in the new command line mode. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-08 15:06:08 UTC (rev 18165) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-08 16:49:07 UTC (rev 18166) @@ -736,7 +736,7 @@ InputHandler inputHandler = inputHandlerClass.newInstance(); inputHandler.config(configData); inputHandler.init(); - log.info("Have input handler: "+inputHandler); + // log.info("Have input handler: "+inputHandler); aBatch.setInputHandler(inputHandler); // set the output Handler String outputHandlerClassName; @@ -758,7 +758,7 @@ List<AnnotationSetDefinition> asDefs = new ArrayList<AnnotationSetDefinition>(); outHandler.setAnnSetDefinitions(asDefs); outHandler.init(); - log.info("Have output handler: "+outHandler); + // log.info("Have output handler: "+outHandler); List<OutputHandler> outHandlers = new ArrayList<OutputHandler>(); outHandlers.add(outHandler); aBatch.setOutputHandlers(outHandlers); @@ -774,12 +774,11 @@ enumerator.init(); while(enumerator.hasNext()) { DocumentID id = enumerator.next(); - log.info("Adding document: "+id); + // log.info("Adding document: "+id); docIds.add(id); } - log.info("Number of document ids: "+docIds.size()); + log.info("Number of documents found: "+docIds.size()); aBatch.setDocumentIDs(docIds.toArray(new DocumentID[docIds.size()])); - log.info("The document ids: "+aBatch.getDocumentIDs()); aBatch.init(); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-08 18:31:11
|
Revision: 18167 http://sourceforge.net/p/gate/code/18167 Author: johann_p Date: 2014-07-08 18:31:07 +0000 (Tue, 08 Jul 2014) Log Message: ----------- Fix the name of the output handler class for writing standard GATE xml format Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-08 16:49:07 UTC (rev 18166) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-08 18:31:07 UTC (rev 18167) @@ -742,8 +742,12 @@ String outputHandlerClassName; if(!line.hasOption('f') || line.getOptionValue('f').equals("finf")) { outputHandlerClassName = "gate.cloud.io.file.FastInfosetOutputHandler"; + } else if(line.hasOption('f') && line.getOptionValue('f').equals("xml")) { + outputHandlerClassName = "gate.cloud.io.file.GATEStandOffFileOutputHandler"; } else { - outputHandlerClassName = "gate.cloud.io.file.FileOutputHandler"; + // this should never happen, since we have checked the format + // earlier + outputHandlerClassName = null; } configData = new HashMap<String, String>(); configData.put(IOConstants.PARAM_DOCUMENT_ROOT, line.getOptionValue('o')); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-09 14:49:02
|
Revision: 18169 http://sourceforge.net/p/gate/code/18169 Author: johann_p Date: 2014-07-09 14:48:59 +0000 (Wed, 09 Jul 2014) Log Message: ----------- Add some more information about elapsed times and memory sizes to the log. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-09 01:19:40 UTC (rev 18168) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-09 14:48:59 UTC (rev 18169) @@ -145,7 +145,15 @@ resultQueue = new LinkedBlockingQueue<ProcessResult>(); if(totalDocs > 0) { // create the document Processor + // This will also fill the pool with the custom-duplicated copies, + // so we can find the time needed for this after creating the object processor = new PooledDocumentProcessor(executor.getCorePoolSize()); + // TODO + log.info("Duplication finished"); + log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); + log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); + duplicationFinishedTime = System.currentTimeMillis(); + log.info("Duplication time (seconds): "+(duplicationFinishedTime-loadingFinishedTime)/1000.0); processor.setController(batch.getGateApplication()); processor.setExecutor(executor); processor.setInputHandler(batch.getInputHandler()); @@ -408,7 +416,13 @@ } } long processingFinishedTime = System.currentTimeMillis(); - log.info("Processing time (seconds): "+(processingFinishedTime-loadingFinishedTime)/1000.0); + log.info("Processing finished"); + log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); + log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); + // if we did not need to process anything then the duplicationFinishedTime will not have + // been set and be 0. In that case, set it to the loadingFinishedTime + if(duplicationFinishedTime==0) duplicationFinishedTime = loadingFinishedTime; + log.info("Processing time (seconds): "+(processingFinishedTime-duplicationFinishedTime)/1000.0); log.info("Total time (seconds): "+(processingFinishedTime-startTime)/1000.0); } @@ -513,7 +527,10 @@ static long startTime = System.currentTimeMillis(); static long loadingFinishedTime; - + static long duplicationFinishedTime; + static Runtime runtime = Runtime.getRuntime(); + static final int MB = 1024*1024; + /** * Main entry point. This can be invoked in one of two ways: the "legacy" * mode which expects two parameters and a command line mode which allows @@ -650,8 +667,6 @@ } }); - final int MB = 1024*1024; - Runtime runtime = Runtime.getRuntime(); log.info("Initial total allocated memory: "+(runtime.totalMemory()/MB)+"M"); log.info("Initial used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); @@ -784,8 +799,9 @@ log.info("Number of documents found: "+docIds.size()); aBatch.setDocumentIDs(docIds.toArray(new DocumentID[docIds.size()])); aBatch.init(); - } + } } + log.info("Loading finished"); log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); loadingFinishedTime = System.currentTimeMillis(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-09 14:54:21
|
Revision: 18170 http://sourceforge.net/p/gate/code/18170 Author: johann_p Date: 2014-07-09 14:54:13 +0000 (Wed, 09 Jul 2014) Log Message: ----------- fixed the location where we report duplication time to the hopefully correct one Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-09 14:48:59 UTC (rev 18169) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-09 14:54:13 UTC (rev 18170) @@ -144,22 +144,18 @@ setState(JobState.RUNNING); resultQueue = new LinkedBlockingQueue<ProcessResult>(); if(totalDocs > 0) { - // create the document Processor - // This will also fill the pool with the custom-duplicated copies, - // so we can find the time needed for this after creating the object processor = new PooledDocumentProcessor(executor.getCorePoolSize()); - // TODO - log.info("Duplication finished"); - log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); - log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); - duplicationFinishedTime = System.currentTimeMillis(); - log.info("Duplication time (seconds): "+(duplicationFinishedTime-loadingFinishedTime)/1000.0); processor.setController(batch.getGateApplication()); processor.setExecutor(executor); processor.setInputHandler(batch.getInputHandler()); processor.setOutputHandlers(batch.getOutputs()); processor.setResultQueue(resultQueue); processor.init(); + log.info("Duplication finished"); + log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); + log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); + duplicationFinishedTime = System.currentTimeMillis(); + log.info("Duplication time (seconds): "+(duplicationFinishedTime-loadingFinishedTime)/1000.0); jobPusher = new Thread(new Runnable() { public void run() { for(DocumentID id : batch.getUnprocessedDocumentIDs()) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2014-07-10 10:07:19
|
Revision: 18174 http://sourceforge.net/p/gate/code/18174 Author: johann_p Date: 2014-07-10 10:07:15 +0000 (Thu, 10 Jul 2014) Log Message: ----------- Make an effort to collect garbage before reportign memory sizes. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-10 01:19:48 UTC (rev 18173) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2014-07-10 10:07:15 UTC (rev 18174) @@ -152,6 +152,7 @@ processor.setResultQueue(resultQueue); processor.init(); log.info("Duplication finished"); + System.gc(); log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); duplicationFinishedTime = System.currentTimeMillis(); @@ -413,6 +414,7 @@ } long processingFinishedTime = System.currentTimeMillis(); log.info("Processing finished"); + System.gc(); log.info("Total allocated memory: "+(runtime.totalMemory()/MB)+"M"); log.info("Used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); // if we did not need to process anything then the duplicationFinishedTime will not have @@ -663,6 +665,8 @@ } }); + System.gc(); + log.info("Processors available: "+runtime.availableProcessors()); log.info("Initial total allocated memory: "+(runtime.totalMemory()/MB)+"M"); log.info("Initial used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ian...@us...> - 2015-05-01 15:01:45
|
Revision: 18661 http://sourceforge.net/p/gate/code/18661 Author: ian_roberts Date: 2015-05-01 15:01:43 +0000 (Fri, 01 May 2015) Log Message: ----------- Always respect -Dgate.home=... even when run via gcp-cli.jar Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2015-05-01 15:01:14 UTC (rev 18660) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2015-05-01 15:01:43 UTC (rev 18661) @@ -687,31 +687,18 @@ log.info("Initial used memory: "+((runtime.totalMemory()-runtime.freeMemory())/MB)+"M"); try { - // we set gate home differently depending on which invocation mode - // we have: if we got invoked from gcpcli, we want to use the gate - // home that is in the gcp home, otherwise we want to use the one - // provided by the gate.home property or otherwise the one in gcp home File gateHome = null; - // If we use the GCP gate, this is false, otherwise this is true - boolean useOwnGate = false; File gcpGate = new File(gcpHome,"gate-home"); - if(invokedByGcpCli) { - gateHome = gcpGate; + if(System.getProperty("gate.home") != null) { + gateHome = new File(System.getProperty("gate.home")); } else { - if(System.getProperty("gate.home") != null) { - useOwnGate = true; - gateHome = new File(System.getProperty("gate.home")); - } else { - gateHome = new File(gcpHome,"gate-home"); - } + gateHome = new File(gcpHome,"gate-home"); } Gate.setGateHome(gateHome); - // depending on which gate we ended up using, we set the config/session files - if(useOwnGate) { - // if we use our own GATE, we have to set the site wide config file - // and we have to set the user config to the one in gcpHome - Gate.setSiteConfigFile(new File(gcpGate,"gate.xml")); - } + // use the site and user config files from gcp/gate-home even + // if we are using another location for the actual GATE home + // dir. + Gate.setSiteConfigFile(new File(gcpGate,"gate.xml")); // we always set the user config file to the one in gcp gate Gate.setUserConfigFile(new File(gcpGate, "user-gate.xml")); // we always set the session file to a non-existent file in gcpGate This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ian...@us...> - 2016-05-31 16:24:00
|
Revision: 19372 http://sourceforge.net/p/gate/code/19372 Author: ian_roberts Date: 2016-05-31 16:23:57 +0000 (Tue, 31 May 2016) Log Message: ----------- We've never made use of the fact that one BatchRunner could run more than one batch at the same time, so removed this capability to simplify the code. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-05-31 16:12:01 UTC (rev 19371) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-05-31 16:23:57 UTC (rev 19372) @@ -11,27 +11,31 @@ */ package gate.cloud.batch; +import gate.CorpusController; import gate.Gate; import gate.cloud.batch.BatchJobData.JobState; import gate.cloud.batch.ProcessResult.ReturnCode; +import gate.cloud.io.DocumentEnumerator; +import gate.cloud.io.IOConstants; +import gate.cloud.io.InputHandler; +import gate.cloud.io.OutputHandler; +import gate.cloud.io.StreamingInputHandler; import gate.cloud.util.CLibrary; import gate.cloud.util.Tools; import gate.cloud.util.XMLBatchParser; import gate.creole.ResourceInstantiationException; import gate.util.GateException; +import gate.util.persistence.PersistenceManager; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Hashtable; -import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; @@ -44,28 +48,18 @@ import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; -import org.apache.commons.io.FileUtils; -import org.apache.log4j.Logger; - -import com.sun.jna.Platform; -import gate.CorpusController; -import gate.cloud.io.DocumentData; -import gate.cloud.io.DocumentEnumerator; -import gate.cloud.io.IOConstants; -import gate.cloud.io.InputHandler; -import gate.cloud.io.OutputHandler; -import gate.cloud.io.StreamingInputHandler; -import gate.util.persistence.PersistenceManager; -import java.util.LinkedList; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; +import com.sun.jna.Platform; + /** - * This class is a Batch Runner, i.e. it manages the execution of batch jobs, - * each of them being specified by a {@link Batch} object. This class is a - * singleton, so you can only obtain an instance via the getInstance()method. + * This class is a Batch Runner, i.e. it manages the execution of a batch job, + * specified by a {@link Batch} object. */ public class BatchRunner { private static final Logger log = Logger.getLogger(BatchRunner.class); @@ -173,6 +167,18 @@ if(Thread.interrupted()) { return; } } } + // shut down the executor and wait for it to terminate + executor.shutdown(); + while(!executor.isTerminated()) { + try { + executor.awaitTermination(60L, TimeUnit.SECONDS); + } catch(InterruptedException e) { + // just re-interrupt ourselves and give up + Thread.currentThread().interrupt(); + } + } + + // now we know the batch is finished resultQueue.add(new EndOfBatchResult()); } }, "Batch \"" + getBatchId() + "\"-job-pusher"); @@ -307,65 +313,58 @@ long startTime = System.currentTimeMillis(); try { boolean jobsStillRunning = false; - // for each job - Iterator<String> jobsIter = runningJobs.keySet().iterator(); - while(jobsIter.hasNext()) { - String jobId = jobsIter.next(); - BatchHandler job = runningJobs.get(jobId); - if(job.getState() == JobState.RUNNING) { - List<ProcessResult> results = new ArrayList<ProcessResult>(); - int resultsCount = job.resultQueue.drainTo(results); - boolean finishedBatch = false; - try { - for(ProcessResult result : results) { - if(result.getReturnCode() == ReturnCode.END_OF_BATCH) { - finishedBatch = true; - } else { - long fileSize = result.getOriginalFileSize(); - long docLength = result.getDocumentLength(); - if(fileSize > 0) job.totalBytes += fileSize; - if(docLength > 0) job.totalChars += docLength; - - job.reportWriter.writeCharacters("\n"); - Tools.writeResultToXml(result, job.reportWriter); - switch(result.getReturnCode()){ - case SUCCESS: - job.successDocs++; - break; - case FAIL: - job.errorDocs++; - break; - } + BatchHandler job = runningJob; + if(job.getState() == JobState.RUNNING) { + List<ProcessResult> results = new ArrayList<ProcessResult>(); + int resultsCount = job.resultQueue.drainTo(results); + boolean finishedBatch = false; + try { + for(ProcessResult result : results) { + if(result.getReturnCode() == ReturnCode.END_OF_BATCH) { + finishedBatch = true; + } else { + long fileSize = result.getOriginalFileSize(); + long docLength = result.getDocumentLength(); + if(fileSize > 0) job.totalBytes += fileSize; + if(docLength > 0) job.totalChars += docLength; + + job.reportWriter.writeCharacters("\n"); + Tools.writeResultToXml(result, job.reportWriter); + switch(result.getReturnCode()){ + case SUCCESS: + job.successDocs++; + break; + case FAIL: + job.errorDocs++; + break; } } - job.reportWriter.flush(); - if(finishedBatch) { - job.setState(JobState.FINISHED); - //close the <documents> element - job.reportWriter.writeCharacters("\n"); - job.reportWriter.writeEndElement(); - //write the whole batch report element - Tools.writeBatchResultToXml(job, job.reportWriter); - job.reportWriter.close(); - // this will be null if no documents needed to be processed - if(job.processor != null) job.processor.dispose(); - } else { - jobsStillRunning = true; - } - } catch(XMLStreamException e) { - log.error("Can't write to report file for batch " + jobId - + ", shutting down batch", e); - job.jobPusher.interrupt(); - job.setState(JobState.ERROR); } + job.reportWriter.flush(); + if(finishedBatch) { + job.setState(JobState.FINISHED); + //close the <documents> element + job.reportWriter.writeCharacters("\n"); + job.reportWriter.writeEndElement(); + //write the whole batch report element + Tools.writeBatchResultToXml(job, job.reportWriter); + job.reportWriter.close(); + // this will be null if no documents needed to be processed + if(job.processor != null) job.processor.dispose(); + } else { + jobsStillRunning = true; + } + } catch(XMLStreamException e) { + log.error("Can't write to report file for batch " + job.getBatchId() + + ", shutting down batch", e); + job.jobPusher.interrupt(); + job.setState(JobState.ERROR); } } // if all jobs finished and we should shutdown, then let's shutdown if(!jobsStillRunning) { - if(shutdownWhenFinished) { - shutdown(); - finished = true; - } + shutdown(); + finished = true; if(exitWhenFinished) { System.exit(0); } @@ -389,9 +388,6 @@ * @param numThreads */ public BatchRunner(int numThreads) { - // private constructor to enforce singleton. - runningJobs = Collections - .synchronizedMap(new HashMap<String, BatchHandler>()); // start the executors pool // create the executor // This is similar to an Executors.newFixedThreadPool, but instead @@ -402,12 +398,6 @@ TimeUnit.MILLISECONDS, new AlwaysBlockingSynchronousQueue()); } - public void shutdownWhenFinished(boolean flag) { - synchronized(this) { - this.shutdownWhenFinished = flag; - } - } - public void exitWhenFinished(boolean flag) { synchronized(this) { this.exitWhenFinished = flag; @@ -418,16 +408,6 @@ * Stops this batch runner in an orderly fashion. */ public void shutdown() { - executor.shutdown(); - while(!executor.isTerminated()) { - try { - executor.awaitTermination(60L, TimeUnit.SECONDS); - } catch(InterruptedException e) { - // just re-interrupt ourselves and give up - Thread.currentThread().interrupt(); - return; - } - } long processingFinishedTime = System.currentTimeMillis(); log.info("Processing finished"); System.gc(); @@ -438,13 +418,12 @@ if(duplicationFinishedTime==0) duplicationFinishedTime = loadingFinishedTime; log.info("Processing time (seconds): "+(processingFinishedTime-duplicationFinishedTime)/1000.0); log.info("Total time (seconds): "+(processingFinishedTime-startTime)/1000.0); - } /** * Stores data about the currently running batch jobs. */ - private Map<String, BatchHandler> runningJobs; + private BatchHandler runningJob; /** * Executor used to run the tasks. */ @@ -454,11 +433,6 @@ */ private Thread monitorThread; /** - * A flag used to signal that the batch runner should shutdown when all - * currently running batches have completed. - */ - private boolean shutdownWhenFinished = true; - /** * A flag used to signal that the batch runner should exit the Java process * when all currently running batches have completed. */ @@ -481,13 +455,10 @@ synchronized(this) { // record the new batch String batchId = batch.getBatchId(); - if(runningJobs.containsKey(batchId)) { throw new IllegalArgumentException( - "A batch with the same ID (" + batchId - + ") is already in process!"); } - BatchHandler jobHandler = new BatchHandler(batch); + runningJob = new BatchHandler(batch); // register the batch with JMX try { - StandardMBean batchMBean = new StandardMBean(jobHandler, BatchJobData.class); + StandardMBean batchMBean = new StandardMBean(runningJob, BatchJobData.class); Hashtable<String, String> props = new Hashtable<String, String>(); props.put("type", "Batch"); props.put("id", ObjectName.quote(batch.getBatchId())); @@ -497,9 +468,8 @@ catch(JMException e) { log.warn("Could not register batch with platform MBean server", e); } - runningJobs.put(batchId, jobHandler); // queue the batch for execution - jobHandler.start(); + runningJob.start(); if(monitorThread == null) { // start the thread that monitors the batches, saves the reports, and // manages the automatic shutdown at the end of all jobs. @@ -511,34 +481,6 @@ } } - /** - * Checks if a batch has completed execution. - * - * @param batchId - * @return <tt>true</tt> iff the batch with the given ID has completed - * execution. - */ - public boolean isFinished(String batchId) { - return getBatchData(batchId).getState() == JobState.FINISHED; - } - - /** - * @param batchId - * @return - */ - public BatchJobData getBatchData(String batchId) { - return null; - } - - /** - * Returns a set containing the IDs for all the currently running jobs. - * - * @return a {@link Set} of {@link String}s. - */ - public Set<String> getBatchJobIDs() { - return new HashSet<String>(runningJobs.keySet()); - } - static long startTime = System.currentTimeMillis(); static long loadingFinishedTime; static long duplicationFinishedTime; @@ -839,7 +781,6 @@ log.info("No documents to process, exiting"); } else { instance.runBatch(aBatch); - instance.shutdownWhenFinished(true); instance.exitWhenFinished(true); } } catch(Exception e) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ian...@us...> - 2016-06-01 13:02:32
|
Revision: 19391 http://sourceforge.net/p/gate/code/19391 Author: ian_roberts Date: 2016-06-01 13:02:30 +0000 (Wed, 01 Jun 2016) Log Message: ----------- Fix to un-break streaming input handlers Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-06-01 12:06:40 UTC (rev 19390) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-06-01 13:02:30 UTC (rev 19391) @@ -775,9 +775,8 @@ log.info("Loading time (seconds): "+(loadingFinishedTime-startTime)/1000.0); log.info("Launching batch:\n" + aBatch); - int size = aBatch.getUnprocessedDocumentIDs().length; // if this is run from gcp-direct and there are no unprocessed documents, do nothing - if(!invokedByGcpCli && size == 0) { + if(!invokedByGcpCli && aBatch.getUnprocessedDocumentIDs() != null && aBatch.getUnprocessedDocumentIDs().length == 0) { log.info("No documents to process, exiting"); } else { instance.runBatch(aBatch); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <joh...@us...> - 2016-10-15 18:57:12
|
Revision: 19683 http://sourceforge.net/p/gate/code/19683 Author: johann_p Date: 2016-10-15 18:57:09 +0000 (Sat, 15 Oct 2016) Log Message: ----------- Add support for extension gatexml for GATE XML format. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-15 13:30:06 UTC (rev 19682) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2016-10-15 18:57:09 UTC (rev 19683) @@ -521,7 +521,7 @@ // session files here? options.addOption("b","batchFile",true,"Batch file (required, replaces -i, -o, -x, -r, -I)"); options.addOption("i","inputDirectory",true,"Input directory (required, unless -b given)"); - options.addOption("f","outputFormat",true,"Output format, optional, one of 'xml', 'finf', 'ser', 'json', default is 'finf'"); + options.addOption("f","outputFormat",true,"Output format, optional, one of 'xml'|'gatexml', 'finf', 'ser', 'json', default is 'finf'"); options.addOption("o","outputDirectory",true,"Output directory (not output if missing)"); options.addOption("x","executePipeline",true,"Pipeline/application file to execute (required, unless -b given)"); options.addOption("r","reportFile",true,"Report file (optional, default: report.xml"); @@ -577,6 +577,7 @@ if(line.hasOption('f')) { outFormat = line.getOptionValue('f'); if(!outFormat.equals("xml") && !outFormat.equals("finf") && + !outFormat.equals("gatexml") && !outFormat.equals("ser") && !outFormat.equals("json")) { log.error("Output format (option 'f') must be either 'json', 'ser', xml' or 'finf'"); System.exit(1); @@ -742,6 +743,9 @@ } else if(outFormat.equals("xml")) { outExt = ".xml"; outputHandlerClassName = "gate.cloud.io.file.GATEStandOffFileOutputHandler"; + } else if(outFormat.equals("gatexml")) { + outExt = ".gatexml"; + outputHandlerClassName = "gate.cloud.io.file.GATEStandOffFileOutputHandler"; } else if(outFormat.equals("ser")) { outExt = ".ser"; outputHandlerClassName = "gate.cloud.io.file.SerializedObjectOutputHandler"; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <ian...@us...> - 2017-09-30 13:35:01
|
Revision: 20267 http://sourceforge.net/p/gate/code/20267 Author: ian_roberts Date: 2017-09-30 13:34:59 +0000 (Sat, 30 Sep 2017) Log Message: ----------- Bug fix - batches where the enumerator gives 0 documents would hang forever rather than completing. Modified Paths: -------------- gcp/trunk/src/gate/cloud/batch/BatchRunner.java Modified: gcp/trunk/src/gate/cloud/batch/BatchRunner.java =================================================================== --- gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2017-09-15 13:33:49 UTC (rev 20266) +++ gcp/trunk/src/gate/cloud/batch/BatchRunner.java 2017-09-30 13:34:59 UTC (rev 20267) @@ -197,6 +197,9 @@ } }, "Batch \"" + getBatchId() + "\"-job-pusher"); jobPusher.start(); +} else { + // no documents, so fire end of batch straight away + resultQueue.add(new EndOfBatchResult()); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |