You can subscribe to this list here.
| 2003 |
Jan
|
Feb
(14) |
Mar
(107) |
Apr
(211) |
May
(93) |
Jun
(158) |
Jul
(159) |
Aug
(368) |
Sep
(188) |
Oct
(151) |
Nov
(115) |
Dec
(98) |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 2004 |
Jan
(25) |
Feb
|
Mar
(33) |
Apr
(28) |
May
(116) |
Jun
(2) |
Jul
(117) |
Aug
(19) |
Sep
(9) |
Oct
(2) |
Nov
|
Dec
(4) |
| 2005 |
Jan
|
Feb
|
Mar
|
Apr
|
May
(1) |
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(2) |
Nov
(9) |
Dec
|
| 2006 |
Jan
|
Feb
|
Mar
(22) |
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
| 2007 |
Jan
|
Feb
|
Mar
(6) |
Apr
|
May
|
Jun
|
Jul
|
Aug
(267) |
Sep
|
Oct
|
Nov
(6) |
Dec
(512) |
| 2008 |
Jan
(187) |
Feb
|
Mar
|
Apr
|
May
|
Jun
(3) |
Jul
(6) |
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
| 2011 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
(2) |
Nov
|
Dec
|
| 2012 |
Jan
|
Feb
|
Mar
|
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
|
From: <tr...@us...> - 2003-07-16 22:45:30
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv4858/modules/core/src/com/babeldoc/core/pipeline
Modified Files:
IPipelineStageFactory.java PipelineStageFactory.java
Log Message:
New pipeline stage factory threading model has been implemented.
Index: IPipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/IPipelineStageFactory.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -C2 -d -r1.8 -r1.9
*** IPipelineStageFactory.java 27 Jun 2003 02:19:58 -0000 1.8
--- IPipelineStageFactory.java 16 Jul 2003 22:45:27 -0000 1.9
***************
*** 70,74 ****
import java.util.Collection;
- import java.util.HashMap;
import java.util.Map;
--- 70,73 ----
***************
*** 90,104 ****
public String[] getAllPipelineStageNames() throws PipelineException;
/**
! * Must this pipeline stage be ignored
! *
! * @param stageName stage to determine ignorance
! *
! * @return true - track the document
*
! * @throws com.babeldoc.core.pipeline.PipelineException
*/
!
! // public boolean isStageIgnored(String stageName) throws PipelineException;
/**
--- 89,99 ----
public String[] getAllPipelineStageNames() throws PipelineException;
+
/**
! * Get the resolver.
*
! * @return a reference to the pipeline stage resolver
*/
! public IPipelineStageResolver getResolver();
/**
***************
*** 158,173 ****
public PipelineStageConnection[] getPipelineStageConnections(
String sourceStage, String sinkStage) throws PipelineException;
-
- /**
- * Returns the pipeline stage type for this particular pipeline
- *
- * @param stageName
- *
- * @return
- *
- * @throws com.babeldoc.core.pipeline.PipelineException
- */
- public PipelineStageType getPipelineStageType(String stageName)
- throws PipelineException;
/**
--- 153,156 ----
Index: PipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStageFactory.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -C2 -d -r1.15 -r1.16
*** PipelineStageFactory.java 16 Jul 2003 06:04:26 -0000 1.15
--- PipelineStageFactory.java 16 Jul 2003 22:45:27 -0000 1.16
***************
*** 69,81 ****
import com.babeldoc.core.LogService;
import com.babeldoc.core.Named;
import com.babeldoc.core.journal.IJournalTicket;
- import com.babeldoc.core.journal.JournalException;
- import com.babeldoc.core.journal.JournalFactory;
import java.util.Collection;
- import java.util.HashMap;
import java.util.Map;
- import java.util.Collections;
- import java.util.Vector;
--- 69,79 ----
import com.babeldoc.core.LogService;
import com.babeldoc.core.Named;
+ import com.babeldoc.core.GeneralException;
+ import com.babeldoc.core.pipeline.processor.IPipelineStageProcessor;
+ import com.babeldoc.core.pipeline.processor.PipelineStageProcessorFactory;
import com.babeldoc.core.journal.IJournalTicket;
import java.util.Collection;
import java.util.Map;
***************
*** 98,109 ****
private Map options;
! /** Hold the constructed stages */
! // this is put in a ThreadLocal, since with Threads the same stage can be called by several Threads at once
! private ThreadLocal localStages = new ThreadLocal() {
! protected synchronized Object initialValue() {
! getLog().logDebug(I18n.get("019005", "new"));
! return new HashMap();
! }
! };
/**
--- 96,101 ----
private Map options;
! /** reference to the pipeline stage processor */
! private IPipelineStageProcessor processor;
/**
***************
*** 173,177 ****
--- 165,178 ----
public void setOptions(Map options)
throws PipelineException {
+
this.options = options;
+
+ // Setup the processor
+ try {
+ this.processor = PipelineStageProcessorFactory.getProcessor(this,
+ options.get(IPipelineStageProcessor.PROCESSOR));
+ } catch (GeneralException e) {
+ throw new PipelineException("", e);
+ }
}
***************
*** 186,229 ****
/**
- * Get and instantiate the named pipeline stage.
- *
- * @param stageName the unique name of the stage
- *
- * @return a reference to the pipeline stage
- *
- * @throws PipelineException DOCUMENT ME!
- */
- public IPipelineStage getPipelineStage(String stageName)
- throws PipelineException {
- Map stages = (Map)localStages.get();
-
- if (!stages.containsKey(stageName)) {
- getLog().logDebug(I18n.get("019005", stageName));
-
- try {
- PipelineStageType pipelineStageType = getPipelineStageType(stageName);
- Class stage = pipelineStageType.getTypeClass();
-
- if (stage != null) {
- IPipelineStage pstage = (IPipelineStage) (stage.newInstance());
- pstage.setName(stageName);
- pstage.setResolver(getResolver());
- pstage.setPipelineName(this.getName());
- stages.put(stageName, pstage);
- } else {
- throw new PipelineException(I18n.get("019010",
- pipelineStageType.getTypeName()));
- }
- } catch (InstantiationException ie) {
- throw new PipelineException(ie.getMessage(), ie);
- } catch (IllegalAccessException iae) {
- throw new PipelineException(iae.getMessage(), iae);
- }
- }
-
- return (IPipelineStage) stages.get(stageName);
- }
-
- /**
* Get all the pipieline stage connections between stages
*
--- 187,190 ----
***************
*** 241,258 ****
/**
- * Returns the pipeline stage type for this particular pipeline
- *
- * @param stageName
- *
- * @return
- *
- * @throws PipelineException
- */
- public PipelineStageType getPipelineStageType(String stageName)
- throws PipelineException {
- return this.getResolver().getPipelineStageType(stageName);
- }
-
- /**
* Set the resolver.
*
--- 202,205 ----
***************
*** 266,270 ****
* Get the resolver.
*
! * @return DOCUMENT ME!
*/
public IPipelineStageResolver getResolver() {
--- 213,217 ----
* Get the resolver.
*
! * @return a reference to the pipeline stage resolver
*/
public IPipelineStageResolver getResolver() {
***************
*** 281,285 ****
public boolean isStageTracked(String stageName) {
try {
! return this.getPipelineStage(stageName).isTracked();
} catch (Exception e) {
}
--- 228,232 ----
public boolean isStageTracked(String stageName) {
try {
! return getProcessor().getPipelineStage(stageName).isTracked();
} catch (Exception e) {
}
***************
*** 302,471 ****
public void process(String name, PipelineDocument document,
IJournalTicket ticket, Collection finalResults) throws PipelineException {
! if ((name != null) && !name.equals("null") && (document != null)) {
! getLog().logInfo(I18n.get("019008", name));
!
! // Get the pipeline stage for this name
! IPipelineStage pstage = this.getPipelineStage(name);
!
! if (pstage != null) {
! try {
! pstage.initialize();
! } catch (Exception e) {
! throw new PipelineException("Exception while initializing", e);
! }
! pstage.setDocument(document);
! pstage.setTicket(ticket);
! trackDocument(pstage);
!
! processPipelineStage(pstage, finalResults);
! } else {
! getLog().logError(I18n.get("019008", name), null);
! }
! } else {
! if (finalResults != null) {
! finalResults.add(new PipelineStageResult(null, document, ticket));
! }
! }
! }
!
! /**
! * Process the pipeline stage and then defer to eithe the multithreaded or single threaded
! * implementation - this needs to be extracted.
! *
! * @param pstage The pipeline stage to process
! * @param finalResults collection of documents after all processing is completed.
! * @throws PipelineException
! */
! protected void processPipelineStage(IPipelineStage pstage, Collection finalResults)
! throws PipelineException {
!
! // Process the pipeline stage, get the results of the processing
! PipelineStageResult [] psResults = pstage.processStage();
!
! if(psResults!=null && psResults.length>0) {
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResultsParallel(psResults, finalResults, maxThreads);
! }
! else {
! processPipelineStageResults(psResults, finalResults);
! }
! }
! }
!
! /**
! * Process the pipeline stage finalResults. The gets called from process and
! * handles the finalResults from the processing.
! *
! * @param psResults The results of running the current stage on this document
! * @param finalResults Collection of documents after all processing done (after null)
! *
! * @throws PipelineException
! */
! private void processPipelineStageResults(PipelineStageResult[] psResults,
! Collection finalResults) throws PipelineException {
!
! int numResults = psResults.length;
! for (int i = 0; i < numResults; ++i) {
! processPipelineStageResult(psResults[i], finalResults);
! }
! }
!
! /**
! * Process a single PipelineStageResult.
! *
! * @param psResult The pipeline stage result to process
! * @param finalResults the collection of processed documents.
! * @throws PipelineException
! */
! protected void processPipelineStageResult(PipelineStageResult psResult, Collection finalResults)
! throws PipelineException {
! String name = psResult.getNamePipelineStage();
! PipelineDocument document = psResult.getDocument();
! IJournalTicket ticket = psResult.getTicket();
!
! this.process(name, document, ticket, finalResults);
! }
!
! /**
! * Execute the finalResults using threads in parallel. This uses the maxThreads argument
! * to specify the maximum number of threads to spawn. There is a slight impedance mismatch here.
! * The processPipelineStageResults methods (parallel and non-parallel) differ only in the
! * max threads argument. This indicates the number of threads to spawn to handle the results.
! * Now, if we could consider this an attribute of this factory (or even of some threadpool) this
! * we can remove it from the interface - and we can conflate the interface.
! *
! * @param psResults results from the current pipeline stage process
! * @param finalResults result of the current pipeline stage process
! * @param maxThreads max number of threads to spawn to process the finalResults
! */
! private void processPipelineStageResultsParallel(final PipelineStageResult[] psResults,
! Collection finalResults,
! final int maxThreads )
! throws PipelineException {
! // ensure synchronized access to finalResults Collection if threads are spawned
! final Collection syncResults = Collections.synchronizedCollection(finalResults);
! final int numResults = psResults.length;
! final Vector exceptions = new Vector();
! Vector threads = new Vector();
! for (int i = 0; i < maxThreads; i++) {
! final int index = i;
! Runnable r = new Runnable() {
! public void run() {
! for (int j = 0; j < numResults; j++) {
! if ((j % maxThreads) == index) {
! try {
! processPipelineStageResult(psResults[j], syncResults);
! }
! catch (PipelineException p) {
! exceptions.add(p);
! }
! }
! }
! }
! };
! Thread t = new Thread(r);
! threads.add(t);
! t.start();
! }
!
! // wait for any threads to finish before returning
! for (int i = 0; i < threads.size(); i++) {
! try {
! Thread t = (Thread)threads.get(i);
! t.join();
! }
! catch (InterruptedException ie) {
! }
! }
! // see if any of the threads threw any exceptions, and if so, re-throw the first one
! // TODO: throw an exception encapsulating all the exceptions that were thrown?
! // This might be harder than it appears unless we extend the PipelineException to
! // Add this capability
! if (exceptions.size() > 0) {
! throw (PipelineException)exceptions.get(0);
! }
! }
!
! /**
! * Track the document on the pipeline stage
! *
! * @param pstage
! *
! * @throws PipelineException DOCUMENT ME!
! */
! private void trackDocument(IPipelineStage pstage) throws PipelineException {
! if (pstage.isTracked()) {
! try {
! JournalFactory.getJournal().updateDocument(pstage.getTicket(),
! pstage.getDocument(), getName() + "." + pstage.getName());
! } catch (JournalException e) {
! getLog().logError(e);
! throw new PipelineException(e.getMessage(), e);
! }
! }
}
--- 249,253 ----
public void process(String name, PipelineDocument document,
IJournalTicket ticket, Collection finalResults) throws PipelineException {
! getProcessor().process(name, document, ticket, finalResults);
}
***************
*** 481,484 ****
--- 263,270 ----
return log;
+ }
+
+ public IPipelineStageProcessor getProcessor() {
+ return processor;
}
}
|
|
From: <tr...@us...> - 2003-07-16 22:45:30
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core
In directory sc8-pr-cvs1:/tmp/cvs-serv4858/modules/core/src/com/babeldoc/core
Modified Files:
TieredConfigurationHelper.java
Log Message:
New pipeline stage factory threading model has been implemented.
Index: TieredConfigurationHelper.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/TieredConfigurationHelper.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** TieredConfigurationHelper.java 16 Jul 2003 05:46:01 -0000 1.3
--- TieredConfigurationHelper.java 16 Jul 2003 22:45:26 -0000 1.4
***************
*** 68,71 ****
--- 68,72 ----
import com.babeldoc.core.config.IConfig;
import com.babeldoc.core.config.ConfigService;
+ import com.Ostermiller.util.StringTokenizer;
import java.util.*;
***************
*** 119,134 ****
for(Iterator i = keys.iterator(); i.hasNext();) {
String key = (String)i.next();
! int dotIndex = key.indexOf('.');
! if(dotIndex>0) {
! String name = key.substring(0, dotIndex);
! String rest = key.substring(dotIndex+1);
! String value = config.getString(key);
! Map tconfig = (Map)configs.get(name);
! if(tconfig==null) {
! tconfig = new HashMap();
! configs.put(name, tconfig);
}
! tconfig.put(rest, value);
}
}
--- 120,143 ----
for(Iterator i = keys.iterator(); i.hasNext();) {
String key = (String)i.next();
! String value = config.getString(key);
! StringTokenizer st = new StringTokenizer(key, ".");
! int numTokens = st.countTokens();
! Map working = configs;
!
! int tokenCount = 1;
! while (st.hasMoreTokens()) {
! String token = st.nextToken();
!
! if(tokenCount < numTokens) {
! if (working.get(token) == null) {
! working.put(token, new HashMap());
! }
!
! working = (Map)working.get(token);
! } else {
! working.put(token, value);
}
! ++tokenCount;
}
}
|
|
From: <tr...@us...> - 2003-07-16 22:45:30
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/examples/threads/pipeline In directory sc8-pr-cvs1:/tmp/cvs-serv4858/modules/core/examples/threads/pipeline Added Files: config.properties pipeline.properties Log Message: New pipeline stage factory threading model has been implemented. --- NEW FILE: config.properties --- #pipeline/config configuration #Thu Jun 05 10:31:49 EDT 2003 pipeline.type=simple pipeline.configFile=pipeline/pipeline asyncpipeline.type=simple asyncpipeline.configFile=pipeline/pipeline asyncpipeline.processor.type=async asyncpipeline.processor.maxThreads=4 pooledpipeline.type=simple pooledpipeline.configFile=pipeline/pipeline pooledpipeline.processor.type=threadpool pooledpipeline.processor.poolSize=10 --- NEW FILE: pipeline.properties --- entryStage=ffconvert ffconvert.stageType=FlatToXml ffconvert.flatToXmlFile=flatfile.xml ffconvert.nextStage=splitter splitter.stageType=XpathSplitter splitter.XPath=/big-un/row splitter.nextStage=writer splitter.threaded=true splitter.maxThreads=7 writer.stageType=FileWriter writer.outputFile=out.txt writer.nextStage=null |
|
From: <tr...@us...> - 2003-07-16 22:45:30
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/examples/threads In directory sc8-pr-cvs1:/tmp/cvs-serv4858/modules/core/examples/threads Added Files: data.txt flatfile.xml Log Message: New pipeline stage factory threading model has been implemented. --- NEW FILE: data.txt --- 2002-06-30;1;AAAAAAAAA 2002-06-30;1;CCCCCCCCC 2002-06-30;1;DDDDDDDDD 2002-06-30;1;EEEEEEEEE 2002-06-30;1;FFFFFFFFF 2002-06-30;1;GGGGGGGGG 2002-06-30;1;HHHHHHHHH 2002-06-30;1;IIIIIIIII 2002-06-30;1;JJJJJJJJJ 2002-06-30;1;KKKKKKKKK 2002-06-30;1;LLLLLLLLL 2002-06-30;1;MMMMMMMMM 2002-06-30;1;NNNNNNNNN 2002-06-30;2;AAAAAAAAA 2002-06-30;2;BBBBBBBBB 2002-06-30;2;CCCCCCCCC 2002-06-30;2;DDDDDDDDD 2002-06-30;2;EEEEEEEEE 2002-06-30;2;FFFFFFFFF 2002-06-30;2;GGGGGGGGG 2002-06-30;2;HHHHHHHHH 2002-06-30;2;IIIIIIIII 2002-06-30;2;JJJJJJJJJ 2002-06-30;2;KKKKKKKKK 2002-06-30;2;LLLLLLLLL 2002-06-30;2;MMMMMMMMM --- NEW FILE: flatfile.xml --- <!-- edited with XML Spy v3.5 NT (http://www.xmlspy.com) by bruce mcdonald (Bank of America) --> <?xml-stylesheet type="text/xsl" href="C:\work\vap_rpt\nlpe1_rpt_html.xslt"?> <conversion> <header> <output-document> <root-element>big-un</root-element> <row-element>row</row-element> </output-document> <input-document> <conversion-type>line</conversion-type> <inter-skip>0</inter-skip> <top-skip>0</top-skip> <left-margin>0</left-margin> <lines-per-para>1</lines-per-para> </input-document> </header> <line-fields> <field> <field-name>date</field-name> <field-column>1</field-column> <field-width>10</field-width> </field> <field> <field-name>number</field-name> <field-column>12</field-column> <field-width>1</field-width> </field> <field> <field-name>string</field-name> <field-column>14</field-column> <field-width>10</field-width> </field> </line-fields> </conversion> |
|
From: <tr...@us...> - 2003-07-16 22:31:03
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor In directory sc8-pr-cvs1:/tmp/cvs-serv2808/processor Log Message: Directory /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor added to the repository |
|
From: <tr...@us...> - 2003-07-16 22:27:13
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/examples/threads/pipeline In directory sc8-pr-cvs1:/tmp/cvs-serv2100/threads/pipeline Log Message: Directory /cvsroot/babeldoc/babeldoc/modules/core/examples/threads/pipeline added to the repository |
|
From: <tr...@us...> - 2003-07-16 22:26:23
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/examples/threads In directory sc8-pr-cvs1:/tmp/cvs-serv1987/threads Log Message: Directory /cvsroot/babeldoc/babeldoc/modules/core/examples/threads added to the repository |
|
From: Leech, J. <jl...@vi...> - 2003-07-16 15:13:57
|
I'm going to add preemptive caching of DOM representations out of the XpathSplitter, then I'll commit it all. That way when an XpathSplitter is run on a document, it will only get parsed once, and all the sub-documents will already be parsed. I'll also look into precompiling XPath expressions. -Jonathan -----Original Message----- From: Bruce McDonald [mailto:br...@mc...] Sent: Tuesday, July 15, 2003 8:46 PM To: Leech, Jonathan; bab...@li... Subject: Re: [Babeldoc-devel] DomifyPipelineStage Jonathan. Excellent. Both of these sound GREAT! I especially like the weakhashmap - this really solves this problem elegantly. Please commit. Regards, Bruce. On Tuesday 15 July 2003 07:42 pm, Leech, Jonathan wrote: > I've got a couple of things I want to do to DomifyPipelineStage... > 1) The first is the DTD cache I mentioned before. I would like to use a > config option to control whether DTD's are cached, but its more a global > option than a pipeline-specific option... Is looking for it under > domify/config the right thing to do, or is there another preferred method? > > 2) It stores the parsed XML in the document as an attribute, which when > trying to journal the document caused me to hit java bug 4152790 - > StackOverFlowError serializing / deserializing a large graph of objects. I > got to thinking, even if I could get past the java bug, does it make sense > to journal the DOM representation? So I have implemented the DOM > representation cache as a WeakHashMap in DomifyPipelineStage instead, and > it seems to work. Any objections to commiting this? > > -Jonathan ------------------------------------------------------- This SF.net email is sponsored by: VM Ware With VMware you can run multiple operating systems on a single machine. WITHOUT REBOOTING! Mix Linux / Windows / Novell virtual machines at the same time. Free trial click here: http://www.vmware.com/wl/offer/345/0 _______________________________________________ Babeldoc-devel mailing list Bab...@li... https://lists.sourceforge.net/lists/listinfo/babeldoc-devel |
|
From: Leech, J. <jl...@vi...> - 2003-07-16 14:46:21
|
Bruce, I haven't looked at the refactoring yet but that sounds excellent. I had thought about using a proper ThreadPool but didn't have one and didn't think to look for one. The problem with not letting a PipelineStage specify maxThreads is that you can end up with one PipelineStage using all the available threads, and starving other stages that you want to run concurrently. Also, if you are using up a limited resource in the stage e.g. database connections, it doesn't make sense to have more threads running than connections. But once you go with a thread pool, there should also be a maxThreads there as well. -Jonathan -----Original Message----- From: Bruce McDonald [mailto:br...@mc...] Sent: Tuesday, July 15, 2003 11:29 PM To: bab...@li... Subject: [Babeldoc-devel] Late night musings on parallel execution of pipeline stage results Jonathan, All: I have refactored the PipelineStageFactory so that methods: processPipelineStageResults (non-threaded) processPipelineStageResultsParallel (threaded) are as similar as possible and all possible code overlaps are extracted to other methods. At the core, the difference between the two of them is the maxThreads argument. This argument indicates the number of threads to be spawned to handle the processing of the pipeline stage results. I contend that this is an attribute of the pipeline stage factory and should be managed by a threadpool. In fact, in direct contradiction to my prior email on this matter, an even simpler implementation would be to subclass the non-threaded implementation and override the processPipelineStageResults method to handle all the threaded complexity. thoughts? regards, Bruce. PS. My thanks to Jonathan for stinging my lazy butt out of a destructive cycle of Rise Of Nations multiplayer games :) PPS. I would seriously advise everyone to look at this code - it looks solid. That ThreadLocal thing was new to me. ------------------------------------------------------- This SF.net email is sponsored by: VM Ware With VMware you can run multiple operating systems on a single machine. WITHOUT REBOOTING! Mix Linux / Windows / Novell virtual machines at the same time. Free trial click here: http://www.vmware.com/wl/offer/345/0 _______________________________________________ Babeldoc-devel mailing list Bab...@li... https://lists.sourceforge.net/lists/listinfo/babeldoc-devel |
|
From: <tr...@us...> - 2003-07-16 06:04:33
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/simple
In directory sc8-pr-cvs1:/tmp/cvs-serv26105/pipeline/simple
Modified Files:
SimplePipelineStageFactory.java
Log Message:
fixed error in SimplePipelineStageFactory where parent method.setOptions was not called in overridden method. This cause options to not ever be set on PipelineStageFactory.
Index: SimplePipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/simple/SimplePipelineStageFactory.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -C2 -d -r1.4 -r1.5
*** SimplePipelineStageFactory.java 27 Jun 2003 02:19:59 -0000 1.4
--- SimplePipelineStageFactory.java 16 Jul 2003 06:04:26 -0000 1.5
***************
*** 70,73 ****
--- 70,74 ----
import com.babeldoc.core.pipeline.IPipelineStageFactory;
import com.babeldoc.core.pipeline.PipelineStageFactory;
+ import com.babeldoc.core.pipeline.PipelineException;
import java.util.Map;
***************
*** 91,97 ****
* the setup.
*
! * @param options
*/
! public void setOptions(Map options) {
String configFile = (String) options.get(CONFIG_FILENAME);
IConfig config = ConfigService.getInstance().getConfig(configFile);
--- 92,101 ----
* the setup.
*
! * @param options map of configuration options (possibly nested)
*/
! public void setOptions(Map options)
! throws PipelineException{
! super.setOptions(options);
!
String configFile = (String) options.get(CONFIG_FILENAME);
IConfig config = ConfigService.getInstance().getConfig(configFile);
|
|
From: <tr...@us...> - 2003-07-16 06:04:31
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv26105/pipeline
Modified Files:
PipelineFactory.java PipelineStageFactory.java
Log Message:
fixed error in SimplePipelineStageFactory where parent method.setOptions was not called in overridden method. This cause options to not ever be set on PipelineStageFactory.
Index: PipelineFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineFactory.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -C2 -d -r1.9 -r1.10
*** PipelineFactory.java 16 Jul 2003 05:46:01 -0000 1.9
--- PipelineFactory.java 16 Jul 2003 06:04:26 -0000 1.10
***************
*** 340,356 ****
/**
* Represents a pairing of a pipeline and a stage within that pipeline.
! *
* <p>
* Title: Babel
* </p>
! *
* <p>
* Description: Universal Document Processor
* </p>
! *
* <p>
* Copyright: Copyright (c) 2002
* </p>
! *
* <p>
* Company:
--- 340,356 ----
/**
* Represents a pairing of a pipeline and a stage within that pipeline.
! *
* <p>
* Title: Babel
* </p>
! *
* <p>
* Description: Universal Document Processor
* </p>
! *
* <p>
* Copyright: Copyright (c) 2002
* </p>
! *
* <p>
* Company:
Index: PipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStageFactory.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -C2 -d -r1.14 -r1.15
*** PipelineStageFactory.java 16 Jul 2003 05:17:39 -0000 1.14
--- PipelineStageFactory.java 16 Jul 2003 06:04:26 -0000 1.15
***************
*** 167,175 ****
* Set the options on this pipeline stage factory
*
! * @param options hashtable options
*
* @throws PipelineException DOCUMENT ME!
*/
! public void setOptions(Map options) throws PipelineException {
this.options = options;
}
--- 167,176 ----
* Set the options on this pipeline stage factory
*
! * @param options map options
*
* @throws PipelineException DOCUMENT ME!
*/
! public void setOptions(Map options)
! throws PipelineException {
this.options = options;
}
|
|
From: <tr...@us...> - 2003-07-16 05:46:04
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core
In directory sc8-pr-cvs1:/tmp/cvs-serv23472
Modified Files:
TieredConfigurationHelper.java
Log Message:
now use the TieredConfigurationHelper in PIpelineFactory.
Index: TieredConfigurationHelper.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/TieredConfigurationHelper.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** TieredConfigurationHelper.java 16 Jul 2003 05:17:38 -0000 1.2
--- TieredConfigurationHelper.java 16 Jul 2003 05:46:01 -0000 1.3
***************
*** 67,70 ****
--- 67,71 ----
import com.babeldoc.core.config.IConfig;
+ import com.babeldoc.core.config.ConfigService;
import java.util.*;
***************
*** 86,89 ****
--- 87,99 ----
public TieredConfigurationHelper(IConfig config) {
this.config = config;
+ }
+
+ /**
+ * Construct with a configuration object that has nested or tiered options.
+ *
+ * @param configName the name of the configuration object to load.
+ */
+ public TieredConfigurationHelper(final String configName) {
+ this.config = ConfigService.getInstance().getConfig(configName);
}
|
|
From: <tr...@us...> - 2003-07-16 05:46:04
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv23472/pipeline
Modified Files:
PipelineFactory.java
Log Message:
now use the TieredConfigurationHelper in PIpelineFactory.
Index: PipelineFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineFactory.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -C2 -d -r1.8 -r1.9
*** PipelineFactory.java 27 Jun 2003 02:19:58 -0000 1.8
--- PipelineFactory.java 16 Jul 2003 05:46:01 -0000 1.9
***************
*** 68,73 ****
import com.babeldoc.core.I18n;
import com.babeldoc.core.LogService;
import com.babeldoc.core.config.ConfigService;
- import com.babeldoc.core.config.IConfig;
import com.babeldoc.core.journal.IJournalTicket;
import com.babeldoc.core.pipeline.compiler.PipelineStageCompiler;
--- 68,73 ----
import com.babeldoc.core.I18n;
import com.babeldoc.core.LogService;
+ import com.babeldoc.core.TieredConfigurationHelper;
import com.babeldoc.core.config.ConfigService;
import com.babeldoc.core.journal.IJournalTicket;
import com.babeldoc.core.pipeline.compiler.PipelineStageCompiler;
***************
*** 106,110 ****
PipelineFactory() {
pipelines = new HashMap();
! pipelineConfig = loadConfigurationData();
}
--- 106,110 ----
PipelineFactory() {
pipelines = new HashMap();
! pipelineConfig = loadConfigurationData(); // TODO: Lazy initialize in the getPipelineConfig method
}
***************
*** 134,159 ****
*/
public static Map loadConfigurationData() {
! HashMap pipelineConfig = new HashMap();
!
! IConfig config = ConfigService.getInstance().getConfig(CONFIG_NAME);
!
! for (Iterator keys = config.keys().iterator(); keys.hasNext();) {
! String key = (String) keys.next();
! String value = config.getString(key);
! int dot = key.indexOf('.');
! String name = key.substring(0, dot);
! String option = key.substring(dot + 1);
!
! HashMap pipeConfig = (HashMap) pipelineConfig.get(name);
!
! if (pipeConfig == null) {
! pipeConfig = new HashMap();
! pipelineConfig.put(name, pipeConfig);
! }
!
! pipeConfig.put(option, value);
! }
!
! return pipelineConfig;
}
--- 134,138 ----
*/
public static Map loadConfigurationData() {
! return new TieredConfigurationHelper(CONFIG_NAME).getNamedConfigs();
}
***************
*** 167,175 ****
try {
! names = new String[pipelineConfig.size()];
int i = 0;
! for (Iterator keys = pipelineConfig.keySet().iterator(); keys.hasNext();) {
names[i++] = (String) keys.next();
}
--- 146,154 ----
try {
! names = new String[getPipelineConfig().size()];
int i = 0;
! for (Iterator keys = getPipelineConfig().keySet().iterator(); keys.hasNext();) {
names[i++] = (String) keys.next();
}
***************
*** 195,200 ****
throws PipelineException {
IPipelineStageFactory pipelineStageFactory = PipelineStageCompiler.getInstance()
! .getPipelineStageFactory(this,
! pname);
if (pipelineStageFactory != null) {
--- 174,178 ----
throws PipelineException {
IPipelineStageFactory pipelineStageFactory = PipelineStageCompiler.getInstance()
! .getPipelineStageFactory(this, pname);
if (pipelineStageFactory != null) {
***************
*** 221,228 ****
// Now get it from the factory.
! pipelineStageFactory = (IPipelineStageFactory) pipelines.get(pname);
if (pipelineStageFactory == null) {
! Map config = (Map) pipelineConfig.get(pname);
if (config == null) {
--- 199,206 ----
// Now get it from the factory.
! pipelineStageFactory = (IPipelineStageFactory) getPipelines().get(pname);
if (pipelineStageFactory == null) {
! Map config = (Map) getPipelineConfig().get(pname);
if (config == null) {
***************
*** 232,237 ****
try {
String type = (String) config.get(TYPE);
! Class factoryClass = PipelineStageFactoryType.getPipelineStageFactory(type)
! .getTypeClass();
if (factoryClass != null) {
--- 210,214 ----
try {
String type = (String) config.get(TYPE);
! Class factoryClass = PipelineStageFactoryType.getPipelineStageFactory(type).getTypeClass();
if (factoryClass != null) {
***************
*** 239,243 ****
pipelineStageFactory.setName(pname);
pipelineStageFactory.setOptions(config);
! pipelines.put(pname, pipelineStageFactory);
} else {
throw new PipelineException(I18n.get("019010", type));
--- 216,220 ----
pipelineStageFactory.setName(pname);
pipelineStageFactory.setOptions(config);
! getPipelines().put(pname, pipelineStageFactory);
} else {
throw new PipelineException(I18n.get("019010", type));
***************
*** 342,346 ****
*/
public void clearCache() {
! pipelines.clear();
LogService.getInstance().logDebug("Clearing Entire Pipeline Cache");
ConfigService.clearCache();
--- 319,323 ----
*/
public void clearCache() {
! getPipelines().clear();
LogService.getInstance().logDebug("Clearing Entire Pipeline Cache");
ConfigService.clearCache();
***************
*** 354,358 ****
*/
public void clearCache(String name) {
! pipelines.remove(name);
LogService.getInstance().logDebug("Clearing Pipeline Cache for " + name);
ConfigService.clearCache(name);
--- 331,335 ----
*/
public void clearCache(String name) {
! getPipelines().remove(name);
LogService.getInstance().logDebug("Clearing Pipeline Cache for " + name);
ConfigService.clearCache(name);
|
|
From: Stefan K. <ste...@co...> - 2003-07-16 05:44:32
|
Jonathan,
if there is no veto from anybody else, please change the name as I have not
the newest version here in development.
Thanks, I really appreciate working with all of you guys.
Stefan
-----Ursprüngliche Nachricht-----
Von: Leech, Jonathan [mailto:jl...@vi...]
Gesendet: Dienstag, 15. Juli 2003 23:59
An: 'Stefan Krieger'; bab...@li...
Betreff: RE: [Babeldoc-devel] Splitting Document with Attributes
Stefan,
I implemented the "splitAttributes" option in PipelineStage.java, so any
PipelineStage that can split up documents can use it. I had the same
requirement for XPathSplitter... splitAttributes defaults to false. The
code I commited looks almost identical to your patch -- One difference and
something to watch out for is to not copy the DomifyPipelineStage.DOM_KEY --
this is used to cache the DOM representation of a document, to avoid
reparsing, and if you copy this attribute, you will get weird results down
the line if you use any of the XML parsing pipelines. They will be
effectively running against the XML document before you split it up...
I like the name "copyAttributes" better than "splitAttributes". Should I
change it?
I too had problem with the config files initially, but I think I've got a
handle on it now.
-Jonathan
-----Original Message-----
From: Stefan Krieger [mailto:ste...@co...]
Sent: Tuesday, July 15, 2003 2:41 PM
To: bab...@li...
Subject: [Babeldoc-devel] Splitting Document with Attributes
Hello,
I have seen that the new RouterPipelineStage has an option wether to copy
the attributes to the new document or not.
I think this should be done generally for all PipelineStage that can split
up documents.
I have the same requirement for XPathSpillter and fixed it into the base
PiplineStage method.
What do you think about introducing a new "standard" stage option, i.e.
"copyAttributes" ?
This could be defaulted to false, so it want break with the existing way.
BTW: During our project we have had tremendous trouble with your
configuration appending mechanism.
I can't give you any more details, I am just happy that we have it now
running.
We ended up with copies of some "config.properties" in our config dir and
removed some from the babeldoc JARs.
We are still on version 1.0, so I am not sure if this is already fixed.
Stefan
protected PipelineStageResult[] processHelper(String[] results)
throws PipelineException, JournalException {
IJournal journal = JournalFactory.getJournal();
PipelineStageResult[] psresults = new
PipelineStageResult[results.length];
String name = this.getNextPipelineStageName();
//Process each of the documents
for (int i = 0; i < results.length; ++i) {
IJournalTicket newTicket = journal.forkTicket(ticket);
PipelineDocument newDoc = new PipelineDocument(results[i].getBytes());
// PATCH: copy attributes
PipelineDocument doc = getDocument() ;
for ( Iterator it = doc.keys().iterator(); it.hasNext() ; ) {
String key = (String) it.next() ;
newDoc.put( key, doc.get( key ) ) ;
}
psresults[i] = new PipelineStageResult(name, newDoc, newTicket);
}
return psresults;
}
-------------------------------------------------------
This SF.Net email sponsored by: Parasoft
Error proof Web apps, automate testing & more.
Download & eval WebKing and get a free book.
www.parasoft.com/bulletproofapps1
_______________________________________________
Babeldoc-devel mailing list
Bab...@li...
https://lists.sourceforge.net/lists/listinfo/babeldoc-devel
|
|
From: Bruce M. <br...@mc...> - 2003-07-16 05:29:26
|
Jonathan, All: I have refactored the PipelineStageFactory so that methods: processPipelineStageResults (non-threaded) processPipelineStageResultsParallel (threaded) are as similar as possible and all possible code overlaps are extracted to other methods. At the core, the difference between the two of them is the maxThreads argument. This argument indicates the number of threads to be spawned to handle the processing of the pipeline stage results. I contend that this is an attribute of the pipeline stage factory and should be managed by a threadpool. In fact, in direct contradiction to my prior email on this matter, an even simpler implementation would be to subclass the non-threaded implementation and override the processPipelineStageResults method to handle all the threaded complexity. thoughts? regards, Bruce. PS. My thanks to Jonathan for stinging my lazy butt out of a destructive cycle of Rise Of Nations multiplayer games :) PPS. I would seriously advise everyone to look at this code - it looks solid. That ThreadLocal thing was new to me. |
|
From: <tr...@us...> - 2003-07-16 05:17:43
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder
In directory sc8-pr-cvs1:/tmp/cvs-serv18992/core/src/com/babeldoc/core/pipeline/feeder
Modified Files:
MemoryQueue.java
Log Message:
refactorings and meanderings. Serious looking at the PipelineStageFactory to arrive a simplest yet most adequate interface to facilitate parallel execution of multiple pipeline stage results.
Index: MemoryQueue.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/MemoryQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** MemoryQueue.java 27 Jun 2003 02:19:59 -0000 1.3
--- MemoryQueue.java 16 Jul 2003 05:17:40 -0000 1.4
***************
*** 70,76 ****
/**
! * This file is the intellectual property of Bank of America Corp All rights
! * reserved. Copyright (c) 2003- User: nbkyi9n Date: Jun 5, 2003 Time:
! * 12:17:13 PM
*/
public class MemoryQueue implements IFeederQueue {
--- 70,75 ----
/**
! * Represents an in-memory feeder queue implemented in a MTQueue from the apache threadpool
! * implementation.
*/
public class MemoryQueue implements IFeederQueue {
|
|
From: <tr...@us...> - 2003-07-16 05:17:42
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline
In directory sc8-pr-cvs1:/tmp/cvs-serv18992/core/src/com/babeldoc/core/pipeline
Modified Files:
IPipelineStage.java PipelineStageFactory.java
Log Message:
refactorings and meanderings. Serious looking at the PipelineStageFactory to arrive a simplest yet most adequate interface to facilitate parallel execution of multiple pipeline stage results.
Index: IPipelineStage.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/IPipelineStage.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -C2 -d -r1.11 -r1.12
*** IPipelineStage.java 15 Jul 2003 15:07:53 -0000 1.11
--- IPipelineStage.java 16 Jul 2003 05:17:39 -0000 1.12
***************
*** 69,72 ****
--- 69,73 ----
import com.babeldoc.core.journal.IJournalTicket;
import com.babeldoc.core.option.IConfigurable;
+ import org.apache.avalon.framework.activity.Initializable;
***************
*** 79,83 ****
* @version 1.0
*/
! public interface IPipelineStage extends INamed, IConfigurable {
/**
* set this pipelinestage's document
--- 80,86 ----
* @version 1.0
*/
! public interface IPipelineStage
! extends INamed, IConfigurable, Initializable {
!
/**
* set this pipelinestage's document
***************
*** 146,155 ****
public int getMaxThreads();
- /**
- * Initialization of stage. It is performed when new document is to be
- * processed
- */
- public void initialize();
-
/**
* pipeline stage processing core. This is the heart of the process for the
--- 149,152 ----
Index: PipelineStageFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/PipelineStageFactory.java,v
retrieving revision 1.13
retrieving revision 1.14
diff -C2 -d -r1.13 -r1.14
*** PipelineStageFactory.java 15 Jul 2003 15:07:53 -0000 1.13
--- PipelineStageFactory.java 16 Jul 2003 05:17:39 -0000 1.14
***************
*** 288,331 ****
/**
! * Process tickets - this is the entry point for pipeline information. If
* the name of the next stage is not null, then process normally, otherwise
! * add them to the results collection if the results collection is not null.
*
* @param name of the next stage
* @param document to process
* @param ticket the tracker ticket
! * @param results the resulting processed documents
*
* @throws PipelineException DOCUMENT ME!
*/
public void process(String name, PipelineDocument document,
! IJournalTicket ticket, Collection results) throws PipelineException {
if ((name != null) && !name.equals("null") && (document != null)) {
getLog().logInfo(I18n.get("019008", name));
! // Get the pipeline stage
IPipelineStage pstage = this.getPipelineStage(name);
if (pstage != null) {
! pstage.initialize();
pstage.setDocument(document);
pstage.setTicket(ticket);
trackDocument(pstage);
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResults(pstage.processStage(), results, maxThreads);
! }
! else {
! processPipelineStageResults(pstage.processStage(), results);
! }
} else {
getLog().logError(I18n.get("019008", name), null);
}
} else {
! if (results != null) {
! results.add(new PipelineStageResult(null, document, ticket));
}
}
--- 288,327 ----
/**
! * Process documents - this is the entry point for pipeline processing. If
* the name of the next stage is not null, then process normally, otherwise
! * add them to the finalResults collection if the finalResults collection is not null.
*
* @param name of the next stage
* @param document to process
* @param ticket the tracker ticket
! * @param finalResults the resulting processed documents after all processing is completed
*
* @throws PipelineException DOCUMENT ME!
*/
public void process(String name, PipelineDocument document,
! IJournalTicket ticket, Collection finalResults) throws PipelineException {
if ((name != null) && !name.equals("null") && (document != null)) {
getLog().logInfo(I18n.get("019008", name));
! // Get the pipeline stage for this name
IPipelineStage pstage = this.getPipelineStage(name);
if (pstage != null) {
! try {
! pstage.initialize();
! } catch (Exception e) {
! throw new PipelineException("Exception while initializing", e);
! }
pstage.setDocument(document);
pstage.setTicket(ticket);
trackDocument(pstage);
!
! processPipelineStage(pstage, finalResults);
} else {
getLog().logError(I18n.get("019008", name), null);
}
} else {
! if (finalResults != null) {
! finalResults.add(new PipelineStageResult(null, document, ticket));
}
}
***************
*** 333,436 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
! protected com.babeldoc.core.LogService getLog() {
! if (log == null) {
! log = LogService.getInstance(this.getClass().getName());
! }
! return log;
}
/**
! * Process the pipeline stage results. The gets called from process and
! * handles the results from the processing.
*
! * @param psResults
! * @param results DOCUMENT ME!
*
! * @throws PipelineException DOCUMENT ME!
*/
private void processPipelineStageResults(PipelineStageResult[] psResults,
! Collection results) throws PipelineException {
! String name;
! PipelineDocument document;
! IJournalTicket ticket;
!
! // Only process if we get a valid result
! if ((psResults != null) && (psResults.length > 0)) {
! int numResults = psResults.length;
! for (int i = 0; i < numResults; ++i) {
! name = psResults[i].getNamePipelineStage();
! document = psResults[i].getDocument();
! ticket = psResults[i].getTicket();
! this.process(name, document, ticket, results);
! }
}
}
! private void processPipelineStageResults(final PipelineStageResult[] psResults,
! Collection results, final int maxThreads) throws PipelineException {
! // ensure synchronized access to results Collection if threads are spawned
! final Collection syncResults = Collections.synchronizedCollection(results);
!
! // Only process if we get a valid result
! if ((psResults != null) && (psResults.length > 0)) {
! final int numResults = psResults.length;
!
! // Kick of maxThreads Threads, each of which processes some of the results.
! // For example (maxThreads = 4, numResults = 11
! // Thread Processes These Results
! // 0 0, 4, 8
! // 1 1, 5, 9
! // 2 2, 6, 10
! // 3 3, 7
! Vector threads = new Vector();
! final Vector exceptions = new Vector();
! for (int i = 0; i < maxThreads; i++) {
! final int index = i;
! Runnable r = new Runnable() {
! public void run() {
! for (int j = 0; j < numResults; j++) {
! if ((j % maxThreads) == index) {
! String name = psResults[j].getNamePipelineStage();
! PipelineDocument document = psResults[j].getDocument();
! IJournalTicket ticket = psResults[j].getTicket();
! try {
! process(name, document, ticket, syncResults);
! }
! catch (PipelineException p) {
! exceptions.add(p);
! }
! }
! }
! }
! };
! Thread t = new Thread(r);
! threads.add(t);
! t.start();
! }
!
! // wait for any threads to finish before returning
! for (int i = 0; i < threads.size(); i++) {
! try {
! Thread t = (Thread)threads.get(i);
! t.join();
! }
! catch (InterruptedException ie) {
! }
! }
!
! // see if any of the threads threw any exceptions, and if so, re-throw the first one
! // TODO: throw an exception encapsulating all the exceptions that were thrown?
! if (exceptions.size() > 0) {
! throw (PipelineException)exceptions.get(0);
! }
! }
}
!
/**
* Track the document on the pipeline stage
--- 329,453 ----
/**
! * Process the pipeline stage and then defer to eithe the multithreaded or single threaded
! * implementation - this needs to be extracted.
*
! * @param pstage The pipeline stage to process
! * @param finalResults collection of documents after all processing is completed.
! * @throws PipelineException
*/
! protected void processPipelineStage(IPipelineStage pstage, Collection finalResults)
! throws PipelineException {
! // Process the pipeline stage, get the results of the processing
! PipelineStageResult [] psResults = pstage.processStage();
!
! if(psResults!=null && psResults.length>0) {
! if (pstage.getThreaded()) {
! int maxThreads = pstage.getMaxThreads();
! if (maxThreads <= 0) {
! maxThreads = 2; // default to 2 if <= 0
! }
! processPipelineStageResultsParallel(psResults, finalResults, maxThreads);
! }
! else {
! processPipelineStageResults(psResults, finalResults);
! }
! }
}
/**
! * Process the pipeline stage finalResults. The gets called from process and
! * handles the finalResults from the processing.
*
! * @param psResults The results of running the current stage on this document
! * @param finalResults Collection of documents after all processing done (after null)
*
! * @throws PipelineException
*/
private void processPipelineStageResults(PipelineStageResult[] psResults,
! Collection finalResults) throws PipelineException {
! int numResults = psResults.length;
! for (int i = 0; i < numResults; ++i) {
! processPipelineStageResult(psResults[i], finalResults);
}
}
! /**
! * Process a single PipelineStageResult.
! *
! * @param psResult The pipeline stage result to process
! * @param finalResults the collection of processed documents.
! * @throws PipelineException
! */
! protected void processPipelineStageResult(PipelineStageResult psResult, Collection finalResults)
! throws PipelineException {
! String name = psResult.getNamePipelineStage();
! PipelineDocument document = psResult.getDocument();
! IJournalTicket ticket = psResult.getTicket();
!
! this.process(name, document, ticket, finalResults);
}
!
! /**
! * Execute the finalResults using threads in parallel. This uses the maxThreads argument
! * to specify the maximum number of threads to spawn. There is a slight impedance mismatch here.
! * The processPipelineStageResults methods (parallel and non-parallel) differ only in the
! * max threads argument. This indicates the number of threads to spawn to handle the results.
! * Now, if we could consider this an attribute of this factory (or even of some threadpool) this
! * we can remove it from the interface - and we can conflate the interface.
! *
! * @param psResults results from the current pipeline stage process
! * @param finalResults result of the current pipeline stage process
! * @param maxThreads max number of threads to spawn to process the finalResults
! */
! private void processPipelineStageResultsParallel(final PipelineStageResult[] psResults,
! Collection finalResults,
! final int maxThreads )
! throws PipelineException {
! // ensure synchronized access to finalResults Collection if threads are spawned
! final Collection syncResults = Collections.synchronizedCollection(finalResults);
! final int numResults = psResults.length;
! final Vector exceptions = new Vector();
! Vector threads = new Vector();
! for (int i = 0; i < maxThreads; i++) {
! final int index = i;
! Runnable r = new Runnable() {
! public void run() {
! for (int j = 0; j < numResults; j++) {
! if ((j % maxThreads) == index) {
! try {
! processPipelineStageResult(psResults[j], syncResults);
! }
! catch (PipelineException p) {
! exceptions.add(p);
! }
! }
! }
! }
! };
! Thread t = new Thread(r);
! threads.add(t);
! t.start();
! }
!
! // wait for any threads to finish before returning
! for (int i = 0; i < threads.size(); i++) {
! try {
! Thread t = (Thread)threads.get(i);
! t.join();
! }
! catch (InterruptedException ie) {
! }
! }
! // see if any of the threads threw any exceptions, and if so, re-throw the first one
! // TODO: throw an exception encapsulating all the exceptions that were thrown?
! // This might be harder than it appears unless we extend the PipelineException to
! // Add this capability
! if (exceptions.size() > 0) {
! throw (PipelineException)exceptions.get(0);
! }
! }
!
/**
* Track the document on the pipeline stage
***************
*** 450,453 ****
--- 467,483 ----
}
}
+ }
+
+ /**
+ * TODO: DOCUMENT ME!
+ *
+ * @return DOCUMENT ME!
+ */
+ protected com.babeldoc.core.LogService getLog() {
+ if (log == null) {
+ log = LogService.getInstance(this.getClass().getName());
+ }
+
+ return log;
}
}
|
|
From: <tr...@us...> - 2003-07-16 05:17:41
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core In directory sc8-pr-cvs1:/tmp/cvs-serv18992/core/src/com/babeldoc/core Modified Files: TieredConfigurationHelper.java Log Message: refactorings and meanderings. Serious looking at the PipelineStageFactory to arrive a simplest yet most adequate interface to facilitate parallel execution of multiple pipeline stage results. Index: TieredConfigurationHelper.java =================================================================== RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/TieredConfigurationHelper.java,v retrieving revision 1.1 retrieving revision 1.2 diff -C2 -d -r1.1 -r1.2 *** TieredConfigurationHelper.java 1 Jul 2003 12:48:07 -0000 1.1 --- TieredConfigurationHelper.java 16 Jul 2003 05:17:38 -0000 1.2 *************** *** 1,2 **** --- 1,67 ---- + /* ==================================================================== + * The Apache Software License, Version 1.1 + * + * Copyright (c) 2000 The Apache Software Foundation. All rights + * reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * + * 3. The end-user documentation included with the redistribution, + * if any, must include the following acknowledgment: + * "This product includes software developed by the + * Apache Software Foundation (http://www.apache.org/)." + * Alternately, this acknowledgment may appear in the software itself, + * if and wherever such third-party acknowledgments normally appear. + * + * 4. The names "Apache" and "Apache Software Foundation" must + * not be used to endorse or promote products derived from this + * software without prior written permission. For written + * permission, please contact ap...@ap.... + * + * 5. Products derived from this software may not be called "Apache", + * nor may "Apache" appear in their name, without prior written + * permission of the Apache Software Foundation. + * + * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR + * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * <http://www.apache.org/>. + * + * Portions of this software are based upon public domain software + * originally written at the National Center for Supercomputing Applications, + * University of Illinois, Urbana-Champaign. + * ==================================================================== + * + * Babeldoc: The Universal Document Processor + * + * $Header$ + * $DateTime$ + * $Author$ + * + */ package com.babeldoc.core; *************** *** 6,9 **** --- 71,77 ---- /** + * The Tiered configuration helper assists when dealing with names (keys) in the + * configuration file that are separated by periods (.) that imply some kind of nesting + * or tiering. This is used a lot in babeldoc. */ public class TieredConfigurationHelper { *************** *** 11,18 **** --- 79,96 ---- private Map configs; + /** + * Construct with a configuration object that has nested or tiered options. + * + * @param config + */ public TieredConfigurationHelper(IConfig config) { this.config = config; } + /** + * Get the map of configuration values. + * + * @return + */ public Map getNamedConfigs() { if(configs==null) { *************** *** 22,25 **** --- 100,107 ---- } + /** + * Extract the nested configurations. This is map that whose elements are either strings + * or maps of nested values or sub-maps (and so on). + */ protected void extractedTieredConfigs() { configs = new HashMap(); |
|
From: Bruce M. <br...@mc...> - 2003-07-16 03:09:00
|
Jonathan, all: I have been trolling around this code tonight and here are my observations: 1. The threaded configuration on a pipeline stage. Is this really a configuration option or a characteristic of the stage itself. What I mean is - using this configuration, it is possible to make a potentially unthread safe stage run threaded. What I propose is that those threads that can be run threadsafe implement the org.apache.avalon.framework.thread.ThreadSafe interface. In the process results method, we then check instanceof and serialize if necessary. 2. maxThreads. Now this is definitely a configuration option for a pipeline, but for a particular stage? I dont know. What I propose is that the pipeline stage factory checks if this pipeline is threaded (at a pipeline level) and then sets up a ThreadPool (see the feeder threadpool). In fact it might interesting if we look at how we abstract out the threading from the pipelinestagefactory altogether and then we can implement two strategies - basically a SynchronousPipelineProcessor and an AsynchronousPipelineProcessor. Then based on whether we can run threaded or not, we transfer control to one or the other. Please see the diagram. What do you think?? regards, Bruce. |
|
From: Bruce M. <br...@mc...> - 2003-07-16 02:46:20
|
Jonathan. Excellent. Both of these sound GREAT! I especially like the weakhashmap - this really solves this problem elegantly. Please commit. Regards, Bruce. On Tuesday 15 July 2003 07:42 pm, Leech, Jonathan wrote: > I've got a couple of things I want to do to DomifyPipelineStage... > 1) The first is the DTD cache I mentioned before. I would like to use a > config option to control whether DTD's are cached, but its more a global > option than a pipeline-specific option... Is looking for it under > domify/config the right thing to do, or is there another preferred method? > > 2) It stores the parsed XML in the document as an attribute, which when > trying to journal the document caused me to hit java bug 4152790 - > StackOverFlowError serializing / deserializing a large graph of objects. I > got to thinking, even if I could get past the java bug, does it make sense > to journal the DOM representation? So I have implemented the DOM > representation cache as a WeakHashMap in DomifyPipelineStage instead, and > it seems to work. Any objections to commiting this? > > -Jonathan |
|
From: <tr...@us...> - 2003-07-16 02:33:41
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/xml
In directory sc8-pr-cvs1:/tmp/cvs-serv27890/com/babeldoc/core/pipeline/xml
Modified Files:
XmlPipelineStageResolver.java
Removed Files:
XmlPipelineUnmarshaller.java
Log Message:
removed old code and changed implementation to interface in an interface.
Index: XmlPipelineStageResolver.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/xml/XmlPipelineStageResolver.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -C2 -d -r1.6 -r1.7
*** XmlPipelineStageResolver.java 27 Jun 2003 02:19:59 -0000 1.6
--- XmlPipelineStageResolver.java 16 Jul 2003 02:33:38 -0000 1.7
***************
*** 68,71 ****
--- 68,73 ----
import com.babeldoc.core.ResourceLoader;
import com.babeldoc.core.pipeline.xml.digester.DigesterPipelineUnmarshaller;
+ import com.babeldoc.core.pipeline.IPipelineStageResolver;
+ import com.babeldoc.core.pipeline.PipelineStageResolver;
import java.io.FileNotFoundException;
***************
*** 83,88 ****
*/
public class XmlPipelineStageResolver
! extends com.babeldoc.core.pipeline.PipelineStageResolver
! implements com.babeldoc.core.pipeline.IPipelineStageResolver {
/**
* Set up the data for this resolver
--- 85,90 ----
*/
public class XmlPipelineStageResolver
! extends PipelineStageResolver
! implements IPipelineStageResolver {
/**
* Set up the data for this resolver
--- XmlPipelineUnmarshaller.java DELETED ---
|
|
From: <tr...@us...> - 2003-07-16 02:33:41
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core
In directory sc8-pr-cvs1:/tmp/cvs-serv27890/com/babeldoc/core
Modified Files:
Pair.java
Log Message:
removed old code and changed implementation to interface in an interface.
Index: Pair.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/Pair.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** Pair.java 27 Jun 2003 02:19:57 -0000 1.3
--- Pair.java 16 Jul 2003 02:33:38 -0000 1.4
***************
*** 69,72 ****
--- 69,73 ----
import java.util.HashMap;
+ import java.util.Map;
***************
*** 110,114 ****
* @return hashtable with keys as firsts and values as seconds
*/
! public static HashMap getMap(Pair[] pairs) {
HashMap map = new HashMap();
--- 111,115 ----
* @return hashtable with keys as firsts and values as seconds
*/
! public static Map getMap(Pair[] pairs) {
HashMap map = new HashMap();
|
|
From: <tr...@us...> - 2003-07-16 02:26:08
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core
In directory sc8-pr-cvs1:/tmp/cvs-serv26928
Modified Files:
GeneralException.java
Log Message:
Now uses the Avalon context object instead of map
Index: GeneralException.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/GeneralException.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -C2 -d -r1.7 -r1.8
*** GeneralException.java 27 Jun 2003 02:19:57 -0000 1.7
--- GeneralException.java 16 Jul 2003 02:26:05 -0000 1.8
***************
*** 67,74 ****
import org.apache.commons.lang.exception.NestableException;
!
! import java.util.HashMap;
! import java.util.Map;
!
/**
--- 67,71 ----
import org.apache.commons.lang.exception.NestableException;
! import org.apache.avalon.framework.context.Context;
/**
***************
*** 84,88 ****
* point.
*/
! Map context;
/**
--- 81,85 ----
* point.
*/
! Context context;
/**
***************
*** 117,121 ****
* @param context
*/
! public void setContext(Map context) {
this.context = context;
}
--- 114,118 ----
* @param context
*/
! public void setContext(Context context) {
this.context = context;
}
***************
*** 124,165 ****
* Get the context mapping from the exception.
*
- * @param context
- *
* @return
*/
! public Map getContext(Map context) {
return this.context;
- }
-
- /**
- * Add a name/value pair to the context map. If the map does not exists,
- * create a hashmap. This returns this object so that you can chain these
- * objects.
- *
- * @param pair name / value pair to place in the context map
- *
- * @return this object for chaining
- */
- public GeneralException addContextPair(Pair pair) {
- if (context == null) {
- context = new HashMap();
- }
-
- context.put(pair.getFirst(), pair.getSecond());
-
- return this;
- }
-
- /**
- * Add a name/value as discrete values to the context map. This calls the
- * same method with the Pair argument.
- *
- * @param name The name of object to place in the context map
- * @param value The associated object to place in the map.
- *
- * @return this object for chaining.
- */
- public GeneralException addContextPair(String name, Object value) {
- return addContextPair(new Pair(name, value));
}
--- 121,128 ----
* Get the context mapping from the exception.
*
* @return
*/
! public Context getContext() {
return this.context;
}
|
|
From: <tr...@us...> - 2003-07-16 02:12:24
|
Update of /cvsroot/babeldoc/babeldoc/modules/sql/src/com/babeldoc/sql/pipeline/stage
In directory sc8-pr-cvs1:/tmp/cvs-serv24563
Modified Files:
SqlEnrichPipelineStage.java
Log Message:
removed unnecessary throws clause
Index: SqlEnrichPipelineStage.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/sql/src/com/babeldoc/sql/pipeline/stage/SqlEnrichPipelineStage.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -C2 -d -r1.12 -r1.13
*** SqlEnrichPipelineStage.java 27 Jun 2003 02:05:59 -0000 1.12
--- SqlEnrichPipelineStage.java 16 Jul 2003 02:12:20 -0000 1.13
***************
*** 292,296 ****
* @return
*
- * @throws PipelineException
* @throws ResourceException
*/
--- 292,295 ----
|
|
From: <tr...@us...> - 2003-07-16 02:10:59
|
Update of /cvsroot/babeldoc/babeldoc/modules/sql/src/com/babeldoc/sql/pipeline/stage
In directory sc8-pr-cvs1:/tmp/cvs-serv24426
Modified Files:
SqlQueryPipelineStage.java
Log Message:
updated so that the output xml can have a schema applied to it.
Index: SqlQueryPipelineStage.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/sql/src/com/babeldoc/sql/pipeline/stage/SqlQueryPipelineStage.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -C2 -d -r1.2 -r1.3
*** SqlQueryPipelineStage.java 15 Jul 2003 22:46:56 -0000 1.2
--- SqlQueryPipelineStage.java 16 Jul 2003 02:10:57 -0000 1.3
***************
*** 96,99 ****
--- 96,115 ----
private IResource resource = null;
+ // Various constants for the xml document
+ public static final String XML_HEADER = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n";
+ public static final String ROOT_ELEMENT_START = "<queryresults>\n";
+ public static final String ROW_ELEMENT_START = "<row>\n";
+ public static final String COLUMN_ELEMNENT_END = "</column>\n";
+ public static final String ROW_ELEMENT_END = "</row>\n";
+ public static final String ROOT_ELEMENT_END = "</queryresults>\n";
+ public static final String XML_MIME_TYPE = "text/xml";
+ public static final String COLUMN_ELEMENT_START1 = "<column column-name=\"";
+ public static final String COLUMN_ELEMENT_START2 = "\" column-number=\"";
+ public static final String COLUMN_ELEMENT_START3 = "\">";
+ public static final String QUERY_ELEMENT_START1 = "<query query-name=\"";
+ public static final String QUERY_ELEMENT_START2 = "\" query-number=\"";
+ public static final String QUERY_ELEMENT_START3 = "\">\n";
+ public static final String QUERY_ELEMENT_END = "</query>\n";
+
/**
* Construct with this stages info
***************
*** 139,146 ****
NameValuePair[] queries = this.getOptionList(new String[]{SQL});
StringBuffer xml = new StringBuffer();
! xml.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n");
! xml.append("<queryresults>\n");
Connection conn = null;
--- 155,174 ----
NameValuePair[] queries = this.getOptionList(new String[]{SQL});
+ PipelineDocument newDoc = createQueryDocument(queries);
+ return super.processHelper(newDoc);
+ }
+ /**
+ * For each of the queries provided in the name-value pair, execute the sql and create an xml
+ * document with the rows and columns for each result for each query.
+ *
+ * @param queries
+ * @return
+ * @throws PipelineException
+ */
+ private PipelineDocument createQueryDocument(NameValuePair[] queries) throws PipelineException {
StringBuffer xml = new StringBuffer();
! xml.append(XML_HEADER);
! xml.append(ROOT_ELEMENT_START);
Connection conn = null;
***************
*** 155,159 ****
for (int i = 0; i < queries.length; ++i) {
String queryName = queries[i].getName();
! xml.append("<" + queryName + ">\n");
sql = queries[i].getValue();
stmt = conn.createStatement();
--- 183,187 ----
for (int i = 0; i < queries.length; ++i) {
String queryName = queries[i].getName();
! xml.append(QUERY_ELEMENT_START1+queryName+QUERY_ELEMENT_START2+i+QUERY_ELEMENT_START3);
sql = queries[i].getValue();
stmt = conn.createStatement();
***************
*** 162,176 ****
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
! xml.append("<row>\n");
for (int j = 1, count = metaData.getColumnCount(); j <= count; j++) {
String name = metaData.getColumnLabel(j).toLowerCase();
Object value = rs.getObject(j);
! xml.append("<" + name + ">");
xml.append(value);
! xml.append("</" + name + ">\n");
}
! xml.append("</row>\n");
}
! xml.append("</" + queryName + ">\n");
}
}
--- 190,204 ----
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
! xml.append(ROW_ELEMENT_START);
for (int j = 1, count = metaData.getColumnCount(); j <= count; j++) {
String name = metaData.getColumnLabel(j).toLowerCase();
Object value = rs.getObject(j);
! xml.append(COLUMN_ELEMENT_START1+name+COLUMN_ELEMENT_START2+j+COLUMN_ELEMENT_START3);
xml.append(value);
! xml.append(COLUMN_ELEMNENT_END);
}
! xml.append(ROW_ELEMENT_END);
}
! xml.append(QUERY_ELEMENT_END);
}
}
***************
*** 201,211 ****
}
}
!
! xml.append("</queryresults>\n");
PipelineDocument newDoc = new PipelineDocument(this.getDocument(), xml.toString().getBytes());
! newDoc.setMimeType("text/xml");
!
! return super.processHelper(newDoc);
}
--- 229,237 ----
}
}
! xml.append(ROOT_ELEMENT_END);
PipelineDocument newDoc = new PipelineDocument(this.getDocument(), xml.toString().getBytes());
! newDoc.setMimeType(XML_MIME_TYPE);
! return newDoc;
}
|