|
From: <tr...@us...> - 2003-07-16 22:45:33
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor In directory sc8-pr-cvs1:/tmp/cvs-serv4858/modules/core/src/com/babeldoc/core/pipeline/processor Added Files: AsyncPipelineStageProcessor.java IPipelineStageProcessor.java PipelineStageProcessorFactory.java SyncPipelineStageProcessor.java ThreadPooledPipelineStageProcessor.java Log Message: New pipeline stage factory threading model has been implemented. --- NEW FILE: AsyncPipelineStageProcessor.java --- /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact ap...@ap.... * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * Portions of this software are based upon public domain software * originally written at the National Center for Supercomputing Applications, * University of Illinois, Urbana-Champaign. * ==================================================================== * * Babeldoc: The Universal Document Processor * * $Header: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor/AsyncPipelineStageProcessor.java,v 1.1 2003/07/16 22:45:28 triphop Exp $ * $DateTime$ * $Author: triphop $ * */ package com.babeldoc.core.pipeline.processor; import com.babeldoc.core.pipeline.*; import java.util.*; import org.apache.commons.lang.NumberUtils; /** * The asynchronous pipeline stage processor is capable of spawning threads to * handle multiple pipeline stage results as a result of pipeline stage processing. */ public class AsyncPipelineStageProcessor extends SyncPipelineStageProcessor { private int globalMaxThreads; public static final String MAX_THREADS = "maxThreads"; public static final int DEFAULT_MAX_THREADS = 5; /** * This method is call prior to any processing - it is a necessary configuration * prequisite. * * @param stageFactory set the associated stage factory for this processor */ public void configure(IPipelineStageFactory stageFactory, Map config) { super.configure(stageFactory, config); globalMaxThreads = NumberUtils.stringToInt((String)config.get(MAX_THREADS), DEFAULT_MAX_THREADS); } /** * Process the pipeline stage and then defer to eithe the multithreaded or single threaded * implementation - this needs to be extracted. * * @param pstage The pipeline stage to process * @param finalResults collection of documents after all processing is completed. * @throws PipelineException */ protected void processPipelineStage(IPipelineStage pstage, Collection finalResults) throws PipelineException { // Process the pipeline stage, get the results of the processing PipelineStageResult[] psResults = pstage.processStage(); if (psResults != null && psResults.length > 0) { if (psResults.length > 1) { int maxThreads = pstage.getMaxThreads(); if (maxThreads <= 0) { maxThreads = 2; // default to 2 if <= 0 } if(maxThreads>getGlobalMaxThreads()) { maxThreads = getGlobalMaxThreads(); } processPipelineStageResultsParallel(psResults, finalResults, maxThreads); } else { processPipelineStageResults(psResults, finalResults); } } } /** * Execute the finalResults using threads in parallel. This uses the maxThreads argument * to specify the maximum number of threads to spawn. There is a slight impedance mismatch here. * The processPipelineStageResults methods (parallel and non-parallel) differ only in the * max threads argument. This indicates the number of threads to spawn to handle the results. * Now, if we could consider this an attribute of this factory (or even of some threadpool) this * we can remove it from the interface - and we can conflate the interface. * * @param psResults results from the current pipeline stage process * @param finalResults result of the current pipeline stage process * @param maxThreads max number of threads to spawn to process the finalResults */ protected void processPipelineStageResultsParallel(final PipelineStageResult[] psResults, Collection finalResults, final int maxThreads) throws PipelineException { // ensure synchronized access to finalResults Collection if threads are spawned final Collection syncResults = Collections.synchronizedCollection(finalResults); final int numResults = psResults.length; final Vector exceptions = new Vector(); Vector threads = new Vector(); for (int i = 0; i < maxThreads; i++) { final int index = i; Runnable r = new Runnable() { public void run() { for (int j = 0; j < numResults; j++) { if ((j % maxThreads) == index) { try { processPipelineStageResult(psResults[j], syncResults); } catch (PipelineException p) { exceptions.add(p); } } } } }; Thread t = new Thread(r); threads.add(t); t.start(); } // wait for any threads to finish before returning for (int i = 0; i < threads.size(); i++) { try { Thread t = (Thread) threads.get(i); t.join(); } catch (InterruptedException ie) { } } // see if any of the threads threw any exceptions, and if so, re-throw the first one // TODO: throw an exception encapsulating all the exceptions that were thrown? // This might be harder than it appears unless we extend the PipelineException to // Add this capability if (exceptions.size() > 0) { throw (PipelineException) exceptions.get(0); } } /** * Get the globally configured number of threads allowed. * * @return */ public int getGlobalMaxThreads() { return globalMaxThreads; } } --- NEW FILE: IPipelineStageProcessor.java --- /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact ap...@ap.... * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * Portions of this software are based upon public domain software * originally written at the National Center for Supercomputing Applications, * University of Illinois, Urbana-Champaign. * ==================================================================== * * Babeldoc: The Universal Document Processor * * $Header: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor/IPipelineStageProcessor.java,v 1.1 2003/07/16 22:45:28 triphop Exp $ * $DateTime$ * $Author: triphop $ * */ package com.babeldoc.core.pipeline.processor; import com.babeldoc.core.pipeline.IPipelineStage; import com.babeldoc.core.pipeline.PipelineException; import com.babeldoc.core.pipeline.PipelineDocument; import com.babeldoc.core.pipeline.IPipelineStageFactory; import com.babeldoc.core.journal.IJournalTicket; import java.util.Collection; import java.util.Map; /** * This interface represents that functionality required to execute a pipeline of * stages from a certain stage. It has addition functionality for the maintainence * and configuration of state. * */ public interface IPipelineStageProcessor { public static final String PROCESSOR = "processor"; /** * Get and instantiate the named pipeline stage. * * @param stageName the unique name of the stage * * @return a reference to the pipeline stage * * @throws PipelineException DOCUMENT ME! */ public IPipelineStage getPipelineStage(String stageName) throws PipelineException; /** * Process documents - this is the entry point for pipeline processing. If * the name of the next stage is not null, then process normally, otherwise * add them to the finalResults collection if the finalResults collection is not null. * * @param name of the next stage * @param document to process * @param ticket the tracker ticket * @param finalResults the resulting processed documents after all processing is completed * * @throws com.babeldoc.core.pipeline.PipelineException DOCUMENT ME! */ public void process(String name, PipelineDocument document, IJournalTicket ticket, Collection finalResults) throws PipelineException; /** * This method is call prior to any processing - it is a necessary configuration * prequisite. * * @param stageFactory set the associated stage factory for this processor * @param options - the map of options specific to this process - this might be null */ public void configure(IPipelineStageFactory stageFactory, Map options); } --- NEW FILE: PipelineStageProcessorFactory.java --- package com.babeldoc.core.pipeline.processor; import com.babeldoc.core.pipeline.IPipelineStageFactory; import com.babeldoc.core.GeneralException; import com.babeldoc.core.service.ServiceFactory; import java.util.Map; /** */ public class PipelineStageProcessorFactory { public static final String PROCESSOR_TYPE = "type"; public static final String SERVICE_PROCESSOR = "PipelineStageProcessor."; /** * Quick and easy way to get a Pipeline stage processor implementation. If the * config object is null (no processor specified in the pipeline configuration, * then the default processor is assigned. If the configuration is given and is a * map (nested/tiered configuration), then the type is determined and a * processor is created and initialized. * * @param psFactory the pipeline stage factory associated with this processor * @param config the configuration object - this must either be a null or a map * @return a completely constructed Processor - ready to go. * @throws GeneralException */ public static IPipelineStageProcessor getProcessor(IPipelineStageFactory psFactory, Object config) throws GeneralException { IPipelineStageProcessor processor = null; if(config == null) { // System.out.println("Config is null"); processor = new SyncPipelineStageProcessor(); } else if(!(config instanceof Map)) { // System.out.println("Config is bad"); throw new GeneralException("The configuration settings for the processor must be null or a nested group"); } else { // System.out.println("Config is good (I think)"); Map map = (Map)config; String processorType = (String)map.get(PROCESSOR_TYPE); processor = (IPipelineStageProcessor)(ServiceFactory.getService(SERVICE_PROCESSOR+processorType)); } processor.configure(psFactory, (Map)config); return processor; } } --- NEW FILE: SyncPipelineStageProcessor.java --- /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact ap...@ap.... * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * Portions of this software are based upon public domain software * originally written at the National Center for Supercomputing Applications, * University of Illinois, Urbana-Champaign. * ==================================================================== * * Babeldoc: The Universal Document Processor * * $Header: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor/SyncPipelineStageProcessor.java,v 1.1 2003/07/16 22:45:28 triphop Exp $ * $DateTime$ * $Author: triphop $ * */ package com.babeldoc.core.pipeline.processor; import com.babeldoc.core.pipeline.*; import com.babeldoc.core.journal.IJournalTicket; import com.babeldoc.core.journal.JournalFactory; import com.babeldoc.core.journal.JournalException; import com.babeldoc.core.I18n; import com.babeldoc.core.LogService; import java.util.*; /** * The default pipline stage processor executes ***EVERYTHING*** serialially. * This is of concern when a pipeline returns a number of results. Most of this * code used to live in the pipeline stage factory. */ public class SyncPipelineStageProcessor implements IPipelineStageProcessor { private IPipelineStageFactory stageFactory; // this is put in a ThreadLocal, since with Threads the same stage can be called by several Threads at once private ThreadLocal localStages = new ThreadLocal() { protected synchronized Object initialValue() { LogService.getInstance().logDebug(I18n.get("019005", "new")); return new HashMap(); } }; /** * Get the "owning" pipeline stage factory. * * @return */ public IPipelineStageFactory getStageFactory() { return stageFactory; } /** * Returns the pipeline stage type for this particular pipeline * * @param stageName * * @return * * @throws PipelineException */ public PipelineStageType getPipelineStageType(String stageName) throws PipelineException { return getStageFactory().getResolver().getPipelineStageType(stageName); } /** * Get and instantiate the named pipeline stage. * * @param stageName the unique name of the stage * * @return a reference to the pipeline stage * * @throws PipelineException DOCUMENT ME! */ public IPipelineStage getPipelineStage(String stageName) throws PipelineException { Map stages = (Map)localStages.get(); if (!stages.containsKey(stageName)) { LogService.getInstance().logDebug(I18n.get("019005", stageName)); try { PipelineStageType pipelineStageType = getPipelineStageType(stageName); Class stage = pipelineStageType.getTypeClass(); if (stage != null) { IPipelineStage pstage = (IPipelineStage) (stage.newInstance()); pstage.setName(stageName); pstage.setResolver(getStageFactory().getResolver()); pstage.setPipelineName(getStageFactory().getName()); stages.put(stageName, pstage); } else { throw new PipelineException(I18n.get("019010", pipelineStageType.getTypeName())); } } catch (InstantiationException ie) { throw new PipelineException(ie.getMessage(), ie); } catch (IllegalAccessException iae) { throw new PipelineException(iae.getMessage(), iae); } } return (IPipelineStage) stages.get(stageName); } /** * Process documents - this is the entry point for pipeline processing. If * the name of the next stage is not null, then process normally, otherwise * add them to the finalResults collection if the finalResults collection is not null. * * @param name of the next stage * @param document to process * @param ticket the tracker ticket * @param finalResults the resulting processed documents after all processing is completed * * @throws com.babeldoc.core.pipeline.PipelineException DOCUMENT ME! */ public void process(String name, PipelineDocument document, IJournalTicket ticket, Collection finalResults) throws PipelineException { if ((name != null) && !name.equals("null") && (document != null)) { LogService.getInstance().logInfo(I18n.get("019008", name)); // Get the pipeline stage for this name IPipelineStage pstage = getPipelineStage(name); if (pstage != null) { try { pstage.initialize(); } catch (Exception e) { throw new PipelineException("Exception while initializing", e); } pstage.setDocument(document); pstage.setTicket(ticket); trackDocument(pstage); processPipelineStage(pstage, finalResults); } else { LogService.getInstance().logError(I18n.get("019008", name), null); } } else { if (finalResults != null) { finalResults.add(new PipelineStageResult(null, document, ticket)); } } } /** * This method is call prior to any processing - it is a necessary configuration * prequisite. * * @param stageFactory set the associated stage factory for this processor */ public void configure(IPipelineStageFactory stageFactory, Map config) { this.stageFactory = stageFactory; } /** * Process the pipeline stage and then defer to eithe the multithreaded or single threaded * implementation - this needs to be extracted. * * @param pstage The pipeline stage to process * @param finalResults collection of documents after all processing is completed. * @throws com.babeldoc.core.pipeline.PipelineException */ protected void processPipelineStage(IPipelineStage pstage, Collection finalResults) throws PipelineException { // Process the pipeline stage, get the results of the processing PipelineStageResult [] psResults = pstage.processStage(); if(psResults!=null && psResults.length>0) { processPipelineStageResults(psResults, finalResults); } } /** * Process the pipeline stage finalResults. The gets called from process and * handles the finalResults from the processing. * * @param psResults The results of running the current stage on this document * @param finalResults Collection of documents after all processing done (after null) * * @throws com.babeldoc.core.pipeline.PipelineException */ protected void processPipelineStageResults(PipelineStageResult[] psResults, Collection finalResults) throws PipelineException { int numResults = psResults.length; for (int i = 0; i < numResults; ++i) { processPipelineStageResult(psResults[i], finalResults); } } /** * Process a single PipelineStageResult. * * @param psResult The pipeline stage result to process * @param finalResults the collection of processed documents. * @throws com.babeldoc.core.pipeline.PipelineException */ protected void processPipelineStageResult(PipelineStageResult psResult, Collection finalResults) throws PipelineException { String name = psResult.getNamePipelineStage(); PipelineDocument document = psResult.getDocument(); IJournalTicket ticket = psResult.getTicket(); this.process(name, document, ticket, finalResults); } /** * Track the document on the pipeline stage * * @param pstage * * @throws PipelineException DOCUMENT ME! */ protected void trackDocument(IPipelineStage pstage) throws PipelineException { if (pstage.isTracked()) { try { JournalFactory.getJournal().updateDocument(pstage.getTicket(), pstage.getDocument(), getStageFactory().getName() + "." + pstage.getName()); } catch (JournalException e) { LogService.getInstance().logError(e); throw new PipelineException(e.getMessage(), e); } } } } --- NEW FILE: ThreadPooledPipelineStageProcessor.java --- /* ==================================================================== * The Apache Software License, Version 1.1 * * Copyright (c) 2000 The Apache Software Foundation. All rights * reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Apache Software Foundation (http://www.apache.org/)." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Apache" and "Apache Software Foundation" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact ap...@ap.... * * 5. Products derived from this software may not be called "Apache", * nor may "Apache" appear in their name, without prior written * permission of the Apache Software Foundation. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * * Portions of this software are based upon public domain software * originally written at the National Center for Supercomputing Applications, * University of Illinois, Urbana-Champaign. * ==================================================================== * * Babeldoc: The Universal Document Processor * * $Header: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/processor/ThreadPooledPipelineStageProcessor.java,v 1.1 2003/07/16 22:45:28 triphop Exp $ * $DateTime$ * $Author: triphop $ * */ package com.babeldoc.core.pipeline.processor; import com.babeldoc.core.pipeline.*; import com.babeldoc.core.LogService; import java.util.*; import org.apache.commons.lang.NumberUtils; import org.apache.commons.threadpool.ThreadPool; import org.apache.commons.threadpool.DefaultThreadPool; /** * The thread pooled pipeline stage processor is a processor that maintains a * pool of threads that can be assigned processing. */ public class ThreadPooledPipelineStageProcessor extends SyncPipelineStageProcessor { public static final String POOL_SIZE = "poolSize"; public static final int DEFAULT_POOLSIZE = 5; private int poolsize; private ThreadPool threadPool; /** * This method is call prior to any processing - it is a necessary configuration * prequisite. * * @param stageFactory set the associated stage factory for this processor */ public void configure(IPipelineStageFactory stageFactory, Map config) { super.configure(stageFactory, config); poolsize = NumberUtils.stringToInt((String)config.get(POOL_SIZE), DEFAULT_POOLSIZE); // System.out.println("Setting up a pool with size of: "+poolsize); threadPool = new Defau... [truncated message content] |