|
From: <tr...@us...> - 2003-07-16 22:45:30
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv4858/modules/core/src/com/babeldoc/core/pipeline
Modified Files:
IPipelineStageFactory.java PipelineStageFactory.java
Log Message:
New pipeline stage factory threading model has been implemented.
Index: IPipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/IPipelineStageFactory.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -C2 -d -r1.8 -r1.9
*** IPipelineStageFactory.java 27 Jun 2003 02:19:58 -0000 1.8
--- IPipelineStageFactory.java 16 Jul 2003 22:45:27 -0000 1.9
***************
*** 70,74 ****
import java.util.Collection;
- import java.util.HashMap;
import java.util.Map;
--- 70,73 ----
***************
*** 90,104 ****
public String[] getAllPipelineStageNames() throws PipelineException;
/**
! * Must this pipeline stage be ignored
! *
! * @param stageName stage to determine ignorance
! *
! * @return true - track the document
*
! * @throws com.babeldoc.core.pipeline.PipelineException
*/
!
! // public boolean isStageIgnored(String stageName) throws PipelineException;
/**
--- 89,99 ----
public String[] getAllPipelineStageNames() throws PipelineException;
+
/**
! * Get the resolver.
*
! * @return a reference to the pipeline stage resolver
*/
! public IPipelineStageResolver getResolver();
/**
***************
*** 158,173 ****
public PipelineStageConnection[] getPipelineStageConnections(
String sourceStage, String sinkStage) throws PipelineException;
-
- /**
- * Returns the pipeline stage type for this particular pipeline
- *
- * @param stageName
- *
- * @return
- *
- * @throws com.babeldoc.core.pipeline.PipelineException
- */
- public PipelineStageType getPipelineStageType(String stageName)
- throws PipelineException;
/**
--- 153,156 ----
Index: PipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStageFactory.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -C2 -d -r1.15 -r1.16
*** PipelineStageFactory.java 16 Jul 2003 06:04:26 -0000 1.15
--- PipelineStageFactory.java 16 Jul 2003 22:45:27 -0000 1.16
***************
*** 69,81 ****
import com.babeldoc.core.LogService;
import com.babeldoc.core.Named;
import com.babeldoc.core.journal.IJournalTicket;
- import com.babeldoc.core.journal.JournalException;
- import com.babeldoc.core.journal.JournalFactory;
import java.util.Collection;
- import java.util.HashMap;
import java.util.Map;
- import java.util.Collections;
- import java.util.Vector;
--- 69,79 ----
import com.babeldoc.core.LogService;
import com.babeldoc.core.Named;
+ import com.babeldoc.core.GeneralException;
+ import com.babeldoc.core.pipeline.processor.IPipelineStageProcessor;
+ import com.babeldoc.core.pipeline.processor.PipelineStageProcessorFactory;
import com.babeldoc.core.journal.IJournalTicket;
import java.util.Collection;
import java.util.Map;
***************
*** 98,109 ****
private Map options;
! /** Hold the constructed stages */
! // this is put in a ThreadLocal, since with Threads the same stage can be called by several Threads at once
! private ThreadLocal localStages = new ThreadLocal() {
! protected synchronized Object initialValue() {
! getLog().logDebug(I18n.get("019005", "new"));
! return new HashMap();
! }
! };
/**
--- 96,101 ----
private Map options;
! /** reference to the pipeline stage processor */
! private IPipelineStageProcessor processor;
/**
***************
*** 173,177 ****
--- 165,178 ----
public void setOptions(Map options)
throws PipelineException {
+
this.options = options;
+
+ // Setup the processor
+ try {
+ this.processor = PipelineStageProcessorFactory.getProcessor(this,
+ options.get(IPipelineStageProcessor.PROCESSOR));
+ } catch (GeneralException e) {
+ throw new PipelineException("", e);
+ }
}
***************
*** 186,229 ****
/**
- * Get and instantiate the named pipeline stage.
- *
- * @param stageName the unique name of the stage
- *
- * @return a reference to the pipeline stage
- *
- * @throws PipelineException DOCUMENT ME!
- */
- public IPipelineStage getPipelineStage(String stageName)
- throws PipelineException {
- Map stages = (Map)localStages.get();
-
- if (!stages.containsKey(stageName)) {
- getLog().logDebug(I18n.get("019005", stageName));
-
- try {
- PipelineStageType pipelineStageType = getPipelineStageType(stageName);
- Class stage = pipelineStageType.getTypeClass();
-
- if (stage != null) {
- IPipelineStage pstage = (IPipelineStage) (stage.newInstance());
- pstage.setName(stageName);
- pstage.setResolver(getResolver());
- pstage.setPipelineName(this.getName());
- stages.put(stageName, pstage);
- } else {
- throw new PipelineException(I18n.get("019010",
- pipelineStageType.getTypeName()));
- }
- } catch (InstantiationException ie) {
- throw new PipelineException(ie.getMessage(), ie);
- } catch (IllegalAccessException iae) {
- throw new PipelineException(iae.getMessage(), iae);
- }
- }
-
- return (IPipelineStage) stages.get(stageName);
- }
-
- /**
* Get all the pipieline stage connections between stages
*
--- 187,190 ----
***************
*** 241,258 ****
/**
- * Returns the pipeline stage type for this particular pipeline
- *
- * @param stageName
- *
- * @return
- *
- * @throws PipelineException
- */
- public PipelineStageType getPipelineStageType(String stageName)
- throws PipelineException {
- return this.getResolver().getPipelineStageType(stageName);
- }
-
- /**
* Set the resolver.
*
--- 202,205 ----
***************
*** 266,270 ****
* Get the resolver.
*
! * @return DOCUMENT ME!
*/
public IPipelineStageResolver getResolver() {
--- 213,217 ----
* Get the resolver.
*
! * @return a reference to the pipeline stage resolver
*/
public IPipelineStageResolver getResolver() {
***************
*** 281,285 ****
public boolean isStageTracked(String stageName) {
try {
! return this.getPipelineStage(stageName).isTracked();
} catch (Exception e) {
}
--- 228,232 ----
public boolean isStageTracked(String stageName) {
try {
! return getProcessor().getPipelineStage(stageName).isTracked();
} catch (Exception e) {
}
***************
*** 302,471 ****
public void process(String name, PipelineDocument document,
IJournalTicket ticket, Collection finalResults) throws PipelineException {
! if ((name != null) && !name.equals("null") && (document != null)) {
! getLog().logInfo(I18n.get("019008", name));
!
! // Get the pipeline stage for this name
! IPipelineStage pstage = this.getPipelineStage(name);
!
! if (pstage != null) {
! try {
! pstage.initialize();
! } catch (Exception e) {
! throw new PipelineException("Exception while initializing", e);
! }
! pstage.setDocument(document);
! pstage.setTicket(ticket);
! trackDocument(pstage);
!
! processPipelineStage(pstage, finalResults);
! } else {
! getLog().logError(I18n.get("019008", name), null);
! }
! } else {
! if (finalResults != null) {
! finalResults.add(new PipelineStageResult(null, document, ticket));
! }
! }
! }
!
! /**
! * Process the pipeline stage and then defer to eithe the multithreaded or single threaded
! * implementation - this needs to be extracted.
! *
! * @param pstage The pipeline stage to process
! * @param finalResults collection of documents after all processing is completed.
! * @throws PipelineException
! */
! protected void processPipelineStage(IPipelineStage pstage, Collection finalResults)
! throws PipelineException {
!
! // Process the pipeline stage, get the results of the processing
! PipelineStageResult [] psResults = pstage.processStage();
!
! if(psResults!=null && psResults.length>0) {
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResultsParallel(psResults, finalResults, maxThreads);
! }
! else {
! processPipelineStageResults(psResults, finalResults);
! }
! }
! }
!
! /**
! * Process the pipeline stage finalResults. The gets called from process and
! * handles the finalResults from the processing.
! *
! * @param psResults The results of running the current stage on this document
! * @param finalResults Collection of documents after all processing done (after null)
! *
! * @throws PipelineException
! */
! private void processPipelineStageResults(PipelineStageResult[] psResults,
! Collection finalResults) throws PipelineException {
!
! int numResults = psResults.length;
! for (int i = 0; i < numResults; ++i) {
! processPipelineStageResult(psResults[i], finalResults);
! }
! }
!
! /**
! * Process a single PipelineStageResult.
! *
! * @param psResult The pipeline stage result to process
! * @param finalResults the collection of processed documents.
! * @throws PipelineException
! */
! protected void processPipelineStageResult(PipelineStageResult psResult, Collection finalResults)
! throws PipelineException {
! String name = psResult.getNamePipelineStage();
! PipelineDocument document = psResult.getDocument();
! IJournalTicket ticket = psResult.getTicket();
!
! this.process(name, document, ticket, finalResults);
! }
!
! /**
! * Execute the finalResults using threads in parallel. This uses the maxThreads argument
! * to specify the maximum number of threads to spawn. There is a slight impedance mismatch here.
! * The processPipelineStageResults methods (parallel and non-parallel) differ only in the
! * max threads argument. This indicates the number of threads to spawn to handle the results.
! * Now, if we could consider this an attribute of this factory (or even of some threadpool) this
! * we can remove it from the interface - and we can conflate the interface.
! *
! * @param psResults results from the current pipeline stage process
! * @param finalResults result of the current pipeline stage process
! * @param maxThreads max number of threads to spawn to process the finalResults
! */
! private void processPipelineStageResultsParallel(final PipelineStageResult[] psResults,
! Collection finalResults,
! final int maxThreads )
! throws PipelineException {
! // ensure synchronized access to finalResults Collection if threads are spawned
! final Collection syncResults = Collections.synchronizedCollection(finalResults);
! final int numResults = psResults.length;
! final Vector exceptions = new Vector();
! Vector threads = new Vector();
! for (int i = 0; i < maxThreads; i++) {
! final int index = i;
! Runnable r = new Runnable() {
! public void run() {
! for (int j = 0; j < numResults; j++) {
! if ((j % maxThreads) == index) {
! try {
! processPipelineStageResult(psResults[j], syncResults);
! }
! catch (PipelineException p) {
! exceptions.add(p);
! }
! }
! }
! }
! };
! Thread t = new Thread(r);
! threads.add(t);
! t.start();
! }
!
! // wait for any threads to finish before returning
! for (int i = 0; i < threads.size(); i++) {
! try {
! Thread t = (Thread)threads.get(i);
! t.join();
! }
! catch (InterruptedException ie) {
! }
! }
! // see if any of the threads threw any exceptions, and if so, re-throw the first one
! // TODO: throw an exception encapsulating all the exceptions that were thrown?
! // This might be harder than it appears unless we extend the PipelineException to
! // Add this capability
! if (exceptions.size() > 0) {
! throw (PipelineException)exceptions.get(0);
! }
! }
!
! /**
! * Track the document on the pipeline stage
! *
! * @param pstage
! *
! * @throws PipelineException DOCUMENT ME!
! */
! private void trackDocument(IPipelineStage pstage) throws PipelineException {
! if (pstage.isTracked()) {
! try {
! JournalFactory.getJournal().updateDocument(pstage.getTicket(),
! pstage.getDocument(), getName() + "." + pstage.getName());
! } catch (JournalException e) {
! getLog().logError(e);
! throw new PipelineException(e.getMessage(), e);
! }
! }
}
--- 249,253 ----
public void process(String name, PipelineDocument document,
IJournalTicket ticket, Collection finalResults) throws PipelineException {
! getProcessor().process(name, document, ticket, finalResults);
}
***************
*** 481,484 ****
--- 263,270 ----
return log;
+ }
+
+ public IPipelineStageProcessor getProcessor() {
+ return processor;
}
}
|