|
From: <jon...@us...> - 2003-07-15 15:07:59
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv8139/pipeline
Modified Files:
IPipelineStage.java PipelineStage.java
PipelineStageFactory.java
Log Message:
Threading via the 'threaded' and 'maxThreads' attributes. 'splitAttributes'
copies document attributes to split pipeline stages.
Index: IPipelineStage.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/IPipelineStage.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -C2 -d -r1.10 -r1.11
*** IPipelineStage.java 27 Jun 2003 02:19:58 -0000 1.10
--- IPipelineStage.java 15 Jul 2003 15:07:53 -0000 1.11
***************
*** 137,140 ****
--- 137,150 ----
/**
+ * Return true if nextStages are threaded
+ **/
+ public boolean getThreaded();
+
+ /**
+ * Return the maxThreads for the nextStages
+ **/
+ public int getMaxThreads();
+
+ /**
* Initialization of stage. It is performed when new document is to be
* processed
Index: PipelineStage.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStage.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -C2 -d -r1.14 -r1.15
*** PipelineStage.java 27 Jun 2003 02:19:58 -0000 1.14
--- PipelineStage.java 15 Jul 2003 15:07:53 -0000 1.15
***************
*** 72,75 ****
--- 72,76 ----
import com.babeldoc.core.service.ServiceException;
import com.babeldoc.core.service.ServiceFactory;
+ import com.babeldoc.core.pipeline.stage.DomifyPipelineStage;
import org.apache.commons.lang.builder.ToStringBuilder;
***************
*** 77,80 ****
--- 78,82 ----
import java.util.ArrayList;
import java.util.Iterator;
+ import java.util.Set;
***************
*** 113,116 ****
--- 115,127 ----
public static final String ERROR_HANDLER = "errorHandler";
+ /** constant: split documents inherit attributes */
+ public static final String SPLIT_ATTRIBUTES = "splitAttributes";
+
+ /** constant: nextStages are threaded */
+ public static final String THREADED = "threaded";
+
+ /** constant: maximum number of threads for nextStages */
+ public static final String MAX_THREADS = "maxThreads";
+
/** Store the configuration information */
private IConfigInfo info;
***************
*** 213,216 ****
--- 224,246 ----
/**
+ * Return true if nextStages are threaded
+ **/
+ public boolean getThreaded() {
+ return "true".equalsIgnoreCase(this.getOptions(THREADED));
+ }
+
+ /**
+ * Return the maxThreads for the nextStages
+ **/
+ public int getMaxThreads() {
+ if (this.hasOption(MAX_THREADS)) {
+ return Integer.parseInt(this.getOptions(MAX_THREADS));
+ }
+ else {
+ return 0;
+ }
+ }
+
+ /**
* Get the journal
*
***************
*** 266,270 ****
ConfigOption config = ((PipelineStageInfo) getInfo()).getOptionInPath(keys);
! if (config != null) {
ArrayList array = new ArrayList();
--- 296,300 ----
ConfigOption config = ((PipelineStageInfo) getInfo()).getOptionInPath(keys);
! if ((config != null) && (config.getSuboptionNames() != null)) {
ArrayList array = new ArrayList();
***************
*** 273,277 ****
String oname = (String) iterator.next();
ConfigOption subopt = config.getSuboption(oname);
- ;
String ovalue = (String) subopt.getValue();
--- 303,306 ----
***************
*** 662,665 ****
--- 691,706 ----
PipelineDocument newDoc = new PipelineDocument(results[i].getBytes());
newDoc.setMimeType(mimeType);
+ if ("true".equalsIgnoreCase(getOptions(SPLIT_ATTRIBUTES))) {
+ // copy the attributes from the old document to the new document, minus NAME, MIME_TYPE, and DOM_KEY
+ PipelineDocument oldDoc = getDocument();
+ Set oldKeys = oldDoc.keys();
+ for (Iterator j = oldKeys.iterator(); j.hasNext(); ) {
+ String key = (String)j.next();
+ if (!PipelineDocument.NAME.equals(key) && !PipelineDocument.MIME_TYPE.equals(key) && !DomifyPipelineStage.DOM_KEY.equals(key)) {
+ Object value = oldDoc.get(key);
+ newDoc.put(key, value);
+ }
+ }
+ }
psresults[i] = new PipelineStageResult(name, newDoc, newTicket);
}
***************
*** 689,692 ****
--- 730,745 ----
IJournalTicket newTicket = journal.forkTicket(ticket);
PipelineDocument newDoc = new PipelineDocument(results[i].getBytes());
+ if ("true".equalsIgnoreCase(getOptions(SPLIT_ATTRIBUTES))) {
+ // copy the attributes from the old document to the new document, minus NAME, MIME_TYPE, and DOM_KEY
+ PipelineDocument oldDoc = getDocument();
+ Set oldKeys = oldDoc.keys();
+ for (Iterator j = oldKeys.iterator(); j.hasNext(); ) {
+ String key = (String)j.next();
+ if (!PipelineDocument.NAME.equals(key) && !PipelineDocument.MIME_TYPE.equals(key) && !DomifyPipelineStage.DOM_KEY.equals(key)) {
+ Object value = oldDoc.get(key);
+ newDoc.put(key, value);
+ }
+ }
+ }
psresults[i] = new PipelineStageResult(name, newDoc, newTicket);
}
Index: PipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStageFactory.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -C2 -d -r1.12 -r1.13
*** PipelineStageFactory.java 27 Jun 2003 02:19:58 -0000 1.12
--- PipelineStageFactory.java 15 Jul 2003 15:07:53 -0000 1.13
***************
*** 76,79 ****
--- 76,81 ----
import java.util.HashMap;
import java.util.Map;
+ import java.util.Collections;
+ import java.util.Vector;
***************
*** 97,101 ****
/** Hold the constructed stages */
! private Map stages;
/**
--- 99,109 ----
/** Hold the constructed stages */
! // this is put in a ThreadLocal, since with Threads the same stage can be called by several Threads at once
! private ThreadLocal localStages = new ThreadLocal() {
! protected synchronized Object initialValue() {
! getLog().logDebug(I18n.get("019005", "new"));
! return new HashMap();
! }
! };
/**
***************
*** 187,194 ****
public IPipelineStage getPipelineStage(String stageName)
throws PipelineException {
! if (stages == null) {
! getLog().logDebug(I18n.get("019005", "new"));
! stages = new HashMap();
! }
if (!stages.containsKey(stageName)) {
--- 195,199 ----
public IPipelineStage getPipelineStage(String stageName)
throws PipelineException {
! Map stages = (Map)localStages.get();
if (!stages.containsKey(stageName)) {
***************
*** 307,311 ****
pstage.setTicket(ticket);
trackDocument(pstage);
! processPipelineStageResults(pstage.processStage(), results);
} else {
getLog().logError(I18n.get("019008", name), null);
--- 312,325 ----
pstage.setTicket(ticket);
trackDocument(pstage);
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResults(pstage.processStage(), results, maxThreads);
! }
! else {
! processPipelineStageResults(pstage.processStage(), results);
! }
} else {
getLog().logError(I18n.get("019008", name), null);
***************
*** 359,362 ****
--- 373,436 ----
}
+ private void processPipelineStageResults(final PipelineStageResult[] psResults,
+ Collection results, final int maxThreads) throws PipelineException {
+ // ensure synchronized access to results Collection if threads are spawned
+ final Collection syncResults = Collections.synchronizedCollection(results);
+
+ // Only process if we get a valid result
+ if ((psResults != null) && (psResults.length > 0)) {
+ final int numResults = psResults.length;
+
+ // Kick of maxThreads Threads, each of which processes some of the results.
+ // For example (maxThreads = 4, numResults = 11
+ // Thread Processes These Results
+ // 0 0, 4, 8
+ // 1 1, 5, 9
+ // 2 2, 6, 10
+ // 3 3, 7
+ Vector threads = new Vector();
+ final Vector exceptions = new Vector();
+ for (int i = 0; i < maxThreads; i++) {
+ final int index = i;
+ Runnable r = new Runnable() {
+ public void run() {
+ for (int j = 0; j < numResults; j++) {
+ if ((j % maxThreads) == index) {
+ String name = psResults[j].getNamePipelineStage();
+ PipelineDocument document = psResults[j].getDocument();
+ IJournalTicket ticket = psResults[j].getTicket();
+ try {
+ process(name, document, ticket, syncResults);
+ }
+ catch (PipelineException p) {
+ exceptions.add(p);
+ }
+ }
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ threads.add(t);
+ t.start();
+ }
+
+ // wait for any threads to finish before returning
+ for (int i = 0; i < threads.size(); i++) {
+ try {
+ Thread t = (Thread)threads.get(i);
+ t.join();
+ }
+ catch (InterruptedException ie) {
+ }
+ }
+
+ // see if any of the threads threw any exceptions, and if so, re-throw the first one
+ // TODO: throw an exception encapsulating all the exceptions that were thrown?
+ if (exceptions.size() > 0) {
+ throw (PipelineException)exceptions.get(0);
+ }
+ }
+ }
+
/**
* Track the document on the pipeline stage
|