|
From: <tr...@us...> - 2003-07-16 05:17:42
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv18992/core/src/com/babeldoc/core/pipeline
Modified Files:
IPipelineStage.java PipelineStageFactory.java
Log Message:
refactorings and meanderings. Serious looking at the PipelineStageFactory to arrive a simplest yet most adequate interface to facilitate parallel execution of multiple pipeline stage results.
Index: IPipelineStage.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/IPipelineStage.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -C2 -d -r1.11 -r1.12
*** IPipelineStage.java 15 Jul 2003 15:07:53 -0000 1.11
--- IPipelineStage.java 16 Jul 2003 05:17:39 -0000 1.12
***************
*** 69,72 ****
--- 69,73 ----
import com.babeldoc.core.journal.IJournalTicket;
import com.babeldoc.core.option.IConfigurable;
+ import org.apache.avalon.framework.activity.Initializable;
***************
*** 79,83 ****
* @version 1.0
*/
! public interface IPipelineStage extends INamed, IConfigurable {
/**
* set this pipelinestage's document
--- 80,86 ----
* @version 1.0
*/
! public interface IPipelineStage
! extends INamed, IConfigurable, Initializable {
!
/**
* set this pipelinestage's document
***************
*** 146,155 ****
public int getMaxThreads();
- /**
- * Initialization of stage. It is performed when new document is to be
- * processed
- */
- public void initialize();
-
/**
* pipeline stage processing core. This is the heart of the process for the
--- 149,152 ----
Index: PipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStageFactory.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -C2 -d -r1.13 -r1.14
*** PipelineStageFactory.java 15 Jul 2003 15:07:53 -0000 1.13
--- PipelineStageFactory.java 16 Jul 2003 05:17:39 -0000 1.14
***************
*** 288,331 ****
/**
! * Process tickets - this is the entry point for pipeline information. If
* the name of the next stage is not null, then process normally, otherwise
! * add them to the results collection if the results collection is not null.
*
* @param name of the next stage
* @param document to process
* @param ticket the tracker ticket
! * @param results the resulting processed documents
*
* @throws PipelineException DOCUMENT ME!
*/
public void process(String name, PipelineDocument document,
! IJournalTicket ticket, Collection results) throws PipelineException {
if ((name != null) && !name.equals("null") && (document != null)) {
getLog().logInfo(I18n.get("019008", name));
! // Get the pipeline stage
IPipelineStage pstage = this.getPipelineStage(name);
if (pstage != null) {
! pstage.initialize();
pstage.setDocument(document);
pstage.setTicket(ticket);
trackDocument(pstage);
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResults(pstage.processStage(), results, maxThreads);
! }
! else {
! processPipelineStageResults(pstage.processStage(), results);
! }
} else {
getLog().logError(I18n.get("019008", name), null);
}
} else {
! if (results != null) {
! results.add(new PipelineStageResult(null, document, ticket));
}
}
--- 288,327 ----
/**
! * Process documents - this is the entry point for pipeline processing. If
* the name of the next stage is not null, then process normally, otherwise
! * add them to the finalResults collection if the finalResults collection is not null.
*
* @param name of the next stage
* @param document to process
* @param ticket the tracker ticket
! * @param finalResults the resulting processed documents after all processing is completed
*
* @throws PipelineException DOCUMENT ME!
*/
public void process(String name, PipelineDocument document,
! IJournalTicket ticket, Collection finalResults) throws PipelineException {
if ((name != null) && !name.equals("null") && (document != null)) {
getLog().logInfo(I18n.get("019008", name));
! // Get the pipeline stage for this name
IPipelineStage pstage = this.getPipelineStage(name);
if (pstage != null) {
! try {
! pstage.initialize();
! } catch (Exception e) {
! throw new PipelineException("Exception while initializing", e);
! }
pstage.setDocument(document);
pstage.setTicket(ticket);
trackDocument(pstage);
!
! processPipelineStage(pstage, finalResults);
} else {
getLog().logError(I18n.get("019008", name), null);
}
} else {
! if (finalResults != null) {
! finalResults.add(new PipelineStageResult(null, document, ticket));
}
}
***************
*** 333,436 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
! protected com.babeldoc.core.LogService getLog() {
! if (log == null) {
! log = LogService.getInstance(this.getClass().getName());
! }
! return log;
}
/**
! * Process the pipeline stage results. The gets called from process and
! * handles the results from the processing.
*
! * @param psResults
! * @param results DOCUMENT ME!
*
! * @throws PipelineException DOCUMENT ME!
*/
private void processPipelineStageResults(PipelineStageResult[] psResults,
! Collection results) throws PipelineException {
! String name;
! PipelineDocument document;
! IJournalTicket ticket;
!
! // Only process if we get a valid result
! if ((psResults != null) && (psResults.length > 0)) {
! int numResults = psResults.length;
! for (int i = 0; i < numResults; ++i) {
! name = psResults[i].getNamePipelineStage();
! document = psResults[i].getDocument();
! ticket = psResults[i].getTicket();
! this.process(name, document, ticket, results);
! }
}
}
! private void processPipelineStageResults(final PipelineStageResult[] psResults,
! Collection results, final int maxThreads) throws PipelineException {
! // ensure synchronized access to results Collection if threads are spawned
! final Collection syncResults = Collections.synchronizedCollection(results);
!
! // Only process if we get a valid result
! if ((psResults != null) && (psResults.length > 0)) {
! final int numResults = psResults.length;
!
! // Kick of maxThreads Threads, each of which processes some of the results.
! // For example (maxThreads = 4, numResults = 11
! // Thread Processes These Results
! // 0 0, 4, 8
! // 1 1, 5, 9
! // 2 2, 6, 10
! // 3 3, 7
! Vector threads = new Vector();
! final Vector exceptions = new Vector();
! for (int i = 0; i < maxThreads; i++) {
! final int index = i;
! Runnable r = new Runnable() {
! public void run() {
! for (int j = 0; j < numResults; j++) {
! if ((j % maxThreads) == index) {
! String name = psResults[j].getNamePipelineStage();
! PipelineDocument document = psResults[j].getDocument();
! IJournalTicket ticket = psResults[j].getTicket();
! try {
! process(name, document, ticket, syncResults);
! }
! catch (PipelineException p) {
! exceptions.add(p);
! }
! }
! }
! }
! };
! Thread t = new Thread(r);
! threads.add(t);
! t.start();
! }
!
! // wait for any threads to finish before returning
! for (int i = 0; i < threads.size(); i++) {
! try {
! Thread t = (Thread)threads.get(i);
! t.join();
! }
! catch (InterruptedException ie) {
! }
! }
!
! // see if any of the threads threw any exceptions, and if so, re-throw the first one
! // TODO: throw an exception encapsulating all the exceptions that were thrown?
! if (exceptions.size() > 0) {
! throw (PipelineException)exceptions.get(0);
! }
! }
}
!
/**
* Track the document on the pipeline stage
--- 329,453 ----
/**
! * Process the pipeline stage and then defer to eithe the multithreaded or single threaded
! * implementation - this needs to be extracted.
*
! * @param pstage The pipeline stage to process
! * @param finalResults collection of documents after all processing is completed.
! * @throws PipelineException
*/
! protected void processPipelineStage(IPipelineStage pstage, Collection finalResults)
! throws PipelineException {
! // Process the pipeline stage, get the results of the processing
! PipelineStageResult [] psResults = pstage.processStage();
!
! if(psResults!=null && psResults.length>0) {
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResultsParallel(psResults, finalResults, maxThreads);
! }
! else {
! processPipelineStageResults(psResults, finalResults);
! }
! }
}
/**
! * Process the pipeline stage finalResults. The gets called from process and
! * handles the finalResults from the processing.
*
! * @param psResults The results of running the current stage on this document
! * @param finalResults Collection of documents after all processing done (after null)
*
! * @throws PipelineException
*/
private void processPipelineStageResults(PipelineStageResult[] psResults,
! Collection finalResults) throws PipelineException {
! int numResults = psResults.length;
! for (int i = 0; i < numResults; ++i) {
! processPipelineStageResult(psResults[i], finalResults);
}
}
! /**
! * Process a single PipelineStageResult.
! *
! * @param psResult The pipeline stage result to process
! * @param finalResults the collection of processed documents.
! * @throws PipelineException
! */
! protected void processPipelineStageResult(PipelineStageResult psResult, Collection finalResults)
! throws PipelineException {
! String name = psResult.getNamePipelineStage();
! PipelineDocument document = psResult.getDocument();
! IJournalTicket ticket = psResult.getTicket();
!
! this.process(name, document, ticket, finalResults);
}
!
! /**
! * Execute the finalResults using threads in parallel. This uses the maxThreads argument
! * to specify the maximum number of threads to spawn. There is a slight impedance mismatch here.
! * The processPipelineStageResults methods (parallel and non-parallel) differ only in the
! * max threads argument. This indicates the number of threads to spawn to handle the results.
! * Now, if we could consider this an attribute of this factory (or even of some threadpool) this
! * we can remove it from the interface - and we can conflate the interface.
! *
! * @param psResults results from the current pipeline stage process
! * @param finalResults result of the current pipeline stage process
! * @param maxThreads max number of threads to spawn to process the finalResults
! */
! private void processPipelineStageResultsParallel(final PipelineStageResult[] psResults,
! Collection finalResults,
! final int maxThreads )
! throws PipelineException {
! // ensure synchronized access to finalResults Collection if threads are spawned
! final Collection syncResults = Collections.synchronizedCollection(finalResults);
! final int numResults = psResults.length;
! final Vector exceptions = new Vector();
! Vector threads = new Vector();
! for (int i = 0; i < maxThreads; i++) {
! final int index = i;
! Runnable r = new Runnable() {
! public void run() {
! for (int j = 0; j < numResults; j++) {
! if ((j % maxThreads) == index) {
! try {
! processPipelineStageResult(psResults[j], syncResults);
! }
! catch (PipelineException p) {
! exceptions.add(p);
! }
! }
! }
! }
! };
! Thread t = new Thread(r);
! threads.add(t);
! t.start();
! }
!
! // wait for any threads to finish before returning
! for (int i = 0; i < threads.size(); i++) {
! try {
! Thread t = (Thread)threads.get(i);
! t.join();
! }
! catch (InterruptedException ie) {
! }
! }
! // see if any of the threads threw any exceptions, and if so, re-throw the first one
! // TODO: throw an exception encapsulating all the exceptions that were thrown?
! // This might be harder than it appears unless we extend the PipelineException to
! // Add this capability
! if (exceptions.size() > 0) {
! throw (PipelineException)exceptions.get(0);
! }
! }
!
/**
* Track the document on the pipeline stage
***************
*** 450,453 ****
--- 467,483 ----
}
}
+ }
+
+ /**
+ * TODO: DOCUMENT ME!
+ *
+ * @return DOCUMENT ME!
+ */
+ protected com.babeldoc.core.LogService getLog() {
+ if (log == null) {
+ log = LogService.getInstance(this.getClass().getName());
+ }
+
+ return log;
}
}
|