|
From: <tr...@us...> - 2003-08-04 23:46:41
|
Update of /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder
In directory sc8-pr-cvs1:/tmp/cvs-serv16126/src/com/babeldoc/core/pipeline/feeder
Modified Files:
AsynchronousFeeder.java DiskQueue.java DiskQueueTest.java
FeedDocument.java FeederFactory.java IFeeder.java
IFeederQueue.java MemoryQueue.java SynchronousFeeder.java
Log Message:
Removed the jakarta commons threadpool code - now uses the oswego concurrent classes.
Index: AsynchronousFeeder.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/AsynchronousFeeder.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -C2 -d -r1.8 -r1.9
*** AsynchronousFeeder.java 2 Jul 2003 12:29:25 -0000 1.8
--- AsynchronousFeeder.java 4 Aug 2003 23:46:38 -0000 1.9
***************
*** 71,75 ****
import com.babeldoc.core.pipeline.PipelineException;
- import org.apache.commons.threadpool.DefaultThreadPool;
import org.apache.commons.lang.NumberUtils;
--- 71,74 ----
***************
*** 79,82 ****
--- 78,83 ----
import java.io.IOException;
+ import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
/**
***************
*** 87,93 ****
*
* @author Bmcdonald
! * @version 1.0
*/
! public class AsynchronousFeeder extends SynchronousFeeder implements Runnable {
public static final String Q_TYPE_DISK = "disk";
public static final String Q_TYPE_MEMORY = "memory";
--- 88,96 ----
*
* @author Bmcdonald
! * @version 1.1
*/
! public class AsynchronousFeeder
! extends SynchronousFeeder {
!
public static final String Q_TYPE_DISK = "disk";
public static final String Q_TYPE_MEMORY = "memory";
***************
*** 97,101 ****
public static final String POOLSIZE = "poolSize";
! private DefaultThreadPool threadPool;
/** Queue to place the threads */
--- 100,104 ----
public static final String POOLSIZE = "poolSize";
! private PooledExecutor threadPool;
/** Queue to place the threads */
***************
*** 103,106 ****
--- 106,113 ----
private LogService log = LogService.getInstance(AsynchronousFeeder.class.getName());
private boolean stopRunning = false;
+
+ /** number of millis to wait to get message from queue */
+ public static final int TIME_OUT_MS = 500;
+
/**
* initialize this object - this sets up the queue and the threadpool. This
***************
*** 113,141 ****
*/
public void initialize(Map map) throws GeneralException{
if (map != null) {
! String qtype = (String)map.get(Q_TYPE);
! if (Q_TYPE_DISK.equals(qtype)) {
! String qname = (String)map.get(Q_DIR_NAME);
! String qdir = (String)map.get(Q_DISK_DIR);
! if (qname == null || qname.equals("")) {
! throw new GeneralException("Disk queues must have a 'queueName'");
! }
! if (qdir == null || qdir.equals("")) {
! throw new GeneralException("Disk queues must have a 'queueDir'");
! }
! try {
! this.queue = new DiskQueue(qname, new File(qdir));
! } catch (IOException e) {
! throw new GeneralException("Trying to create a disk queue", e);
! }
! } else {
! this.queue = new MemoryQueue();
! }
}
! int poolSize = NumberUtils.stringToInt((String)map.get(POOLSIZE), 1);
! threadPool = new DefaultThreadPool(poolSize);
for (int i = 0; i < poolSize; ++i) {
! threadPool.invokeLater(this);
}
}
--- 120,204 ----
*/
public void initialize(Map map) throws GeneralException{
+ int poolSize = 1;
if (map != null) {
! configQtype(map);
! poolSize = NumberUtils.stringToInt((String)map.get(POOLSIZE), 1);
}
! threadPool = new PooledExecutor(poolSize);
! setupThreadPool(poolSize);
! }
+ /**
+ * Add a runnable to the thread pool which processes the messages as they get
+ * submitted to the queue.
+ *
+ * @param poolSize
+ */
+ private void setupThreadPool(int poolSize) {
for (int i = 0; i < poolSize; ++i) {
! try {
! threadPool.execute(new Runnable() {
! /**
! * When an object implementing interface <code>Runnable</code> is used
! * to create a thread, starting the thread causes the object's
! * <code>run</code> method to be called in that separately executing
! * thread.
! */
! public void run() {
! while (!stopRunning) {
! if (log.isDebugEnabled()) {
! log.logDebug("Checking for new documents in queue");
! }
!
! FeedDocument feedDoc = null;
! try {
! feedDoc = queue.remove(TIME_OUT_MS);
! } catch (InterruptedException e) {
! log.logError(e);
! }
!
! if (feedDoc != null) {
! try {
! log.logInfo("Processing document...");
! actuallyProcess(feedDoc);
! } catch (PipelineException pe) {
! log.logError(pe);
! } catch (JournalException je) {
! log.logError(je);
! }
! }
! }
! }
! });
! } catch (InterruptedException e) {
! log.logError(e);
! }
! }
! }
!
! /**
! * Configure the queuing implementation.
! *
! * @param map
! * @throws GeneralException
! */
! private void configQtype(Map map) throws GeneralException {
! String qtype = (String)map.get(Q_TYPE);
! if (Q_TYPE_DISK.equals(qtype)) {
! String qname = (String)map.get(Q_DIR_NAME);
! String qdir = (String)map.get(Q_DISK_DIR);
! if (qname == null || qname.equals("")) {
! throw new GeneralException("Disk queues must have a 'queueName'");
! }
! if (qdir == null || qdir.equals("")) {
! throw new GeneralException("Disk queues must have a 'queueDir'");
! }
! try {
! this.queue = new DiskQueue(qname, new File(qdir));
! } catch (IOException e) {
! throw new GeneralException("Trying to create a disk queue", e);
! }
! } else {
! this.queue = new MemoryQueue();
}
}
***************
*** 159,198 ****
throw new PipelineException("Submitting document", t);
}
-
return null;
}
/**
! * This is the run method, where the threads come to play. This will run
! * until the stopRunning flag is set to true. This will cause this method
! * to exit and then the thread stops.
*/
! public void run() {
! while (!stopRunning) {
! if (log.isDebugEnabled()) {
! log.logDebug("Checking for new documents in queue");
! }
!
! FeedDocument feedDoc = queue.remove();
!
! if (feedDoc != null) {
! try {
! log.logInfo("Processing document...");
! super.process(feedDoc);
! } catch (PipelineException pe) {
! LogService.getInstance().logError(pe);
! } catch (JournalException je) {
! LogService.getInstance().logError(je);
! }
! }
! }
}
/**
! * Stops this feeder.
*/
! public void stopRunning() {
this.stopRunning = true;
! this.threadPool.stop();
}
}
--- 222,251 ----
throw new PipelineException("Submitting document", t);
}
return null;
}
+
/**
! * This processes the feeddocuments by passing to the
! * parent class which actually does the processing.
! *
! * @param docFeed
! * @return
! * @throws PipelineException
! * @throws JournalException
*/
! protected Collection actuallyProcess(FeedDocument docFeed)
! throws PipelineException, JournalException {
! return super.process(docFeed);
}
/**
! * This method stops the processing on this feeder. The stop flag is marked
! * and this means that the threads exit from the run method. Additionaly the
! * underlying threadpool is marked to terminate.
*/
! public void terminate() {
this.stopRunning = true;
! this.threadPool.shutdownAfterProcessingCurrentlyQueuedTasks();
}
}
Index: DiskQueue.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/DiskQueue.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -C2 -d -r1.6 -r1.7
*** DiskQueue.java 27 Jun 2003 13:49:12 -0000 1.6
--- DiskQueue.java 4 Aug 2003 23:46:38 -0000 1.7
***************
*** 67,70 ****
--- 67,71 ----
import com.babeldoc.core.Named;
+ import com.babeldoc.core.LogService;
import com.babeldoc.core.pipeline.feeder.FeedDocument;
import com.babeldoc.core.pipeline.feeder.IFeederQueue;
***************
*** 89,94 ****
*
* @author dejank
*/
! public class DiskQueue extends Named implements IFeederQueue {
//Folder where files will be stored
private File tmpFolder;
--- 90,98 ----
*
* @author dejank
+ * @version 1.1
*/
! public class DiskQueue
! extends Named
! implements IFeederQueue {
//Folder where files will be stored
private File tmpFolder;
***************
*** 132,138 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public boolean isEmpty() {
--- 136,142 ----
/**
! * return true if the queue is empty
*
! * @return
*/
public boolean isEmpty() {
***************
*** 141,147 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public int getQueueSize() {
--- 145,151 ----
/**
! * get the size of the queue
*
! * @return
*/
public int getQueueSize() {
***************
*** 164,168 ****
/**
! * TODO: DOCUMENT ME!
*/
public void emptyQueue() {
--- 168,172 ----
/**
! * Remove all elements from the queue
*/
public void emptyQueue() {
***************
*** 222,226 ****
} catch (IOException e) {
//TODO: reconsider exception handling!
! e.printStackTrace();
return null;
--- 226,230 ----
} catch (IOException e) {
//TODO: reconsider exception handling!
! LogService.getInstance().logError(e);
return null;
***************
*** 233,237 ****
/**
! * TODO: DOCUMENT ME!
*
* @param name DOCUMENT ME!
--- 237,241 ----
/**
! * Setup the queue - check the directory, etc.
*
* @param name DOCUMENT ME!
***************
*** 249,253 ****
/**
! * TODO: DOCUMENT ME!
*/
private void deleteTempFiles() {
--- 253,257 ----
/**
! * Delete the temporary files
*/
private void deleteTempFiles() {
***************
*** 262,270 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param docFile DOCUMENT ME!
*
! * @return DOCUMENT ME!
*
* @throws IOException DOCUMENT ME!
--- 266,274 ----
/**
! * Convert a file on disk into a document.
*
! * @param docFile file on disk to deserialize
*
! * @return deserialized document
*
* @throws IOException DOCUMENT ME!
Index: DiskQueueTest.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/DiskQueueTest.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -C2 -d -r1.4 -r1.5
*** DiskQueueTest.java 27 Jun 2003 02:19:59 -0000 1.4
--- DiskQueueTest.java 4 Aug 2003 23:46:38 -0000 1.5
***************
*** 77,82 ****
*
* @author Dejan
*/
! public class DiskQueueTest extends TestCase {
DiskQueue queue;
FeedDocument document;
--- 77,84 ----
*
* @author Dejan
+ * @version 1.1
*/
! public class DiskQueueTest
! extends TestCase {
DiskQueue queue;
FeedDocument document;
Index: FeedDocument.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/FeedDocument.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -C2 -d -r1.4 -r1.5
*** FeedDocument.java 27 Jun 2003 02:19:59 -0000 1.4
--- FeedDocument.java 4 Aug 2003 23:46:38 -0000 1.5
***************
*** 78,84 ****
*
* @author Bmcdonald
! * @version 1.0
*/
! public class FeedDocument implements Serializable {
private IJournalTicket ticket;
private Map attributes;
--- 78,85 ----
*
* @author Bmcdonald
! * @version 1.1
*/
! public class FeedDocument
! implements Serializable {
private IJournalTicket ticket;
private Map attributes;
***************
*** 107,113 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param attributes DOCUMENT ME!
*/
public void setAttributes(Map attributes) {
--- 108,114 ----
/**
! * Set the attributes for the feeddocument
*
! * @param attributes
*/
public void setAttributes(Map attributes) {
***************
*** 116,122 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public Map getAttributes() {
--- 117,123 ----
/**
! * Get the attributes
*
! * @return
*/
public Map getAttributes() {
***************
*** 125,131 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param binary DOCUMENT ME!
*/
public void setBinary(boolean binary) {
--- 126,132 ----
/**
! * Set the binary flag on the document
*
! * @param binary
*/
public void setBinary(boolean binary) {
***************
*** 134,140 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public boolean isBinary() {
--- 135,141 ----
/**
! * Get the binary flag on the document
*
! * @return
*/
public boolean isBinary() {
***************
*** 143,149 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param data DOCUMENT ME!
*/
public void setData(byte[] data) {
--- 144,150 ----
/**
! * Set the binary data on the document
*
! * @param data binary array
*/
public void setData(byte[] data) {
***************
*** 152,158 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public byte[] getData() {
--- 153,159 ----
/**
! * Get the data for the feed document
*
! * @return get the data
*/
public byte[] getData() {
***************
*** 161,167 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param noJournal DOCUMENT ME!
*/
public void setNoJournal(boolean noJournal) {
--- 162,168 ----
/**
! * Set the no-journal flag
*
! * @param noJournal
*/
public void setNoJournal(boolean noJournal) {
***************
*** 170,176 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public boolean isNoJournal() {
--- 171,177 ----
/**
! * Get the state of the no-journal flag
*
! * @return
*/
public boolean isNoJournal() {
***************
*** 179,185 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param pipelineName DOCUMENT ME!
*/
public void setPipelineName(String pipelineName) {
--- 180,186 ----
/**
! * Set the name of the pipeline to process this feed document
*
! * @param pipelineName
*/
public void setPipelineName(String pipelineName) {
***************
*** 188,194 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public String getPipelineName() {
--- 189,195 ----
/**
! * Get the name of the pipeline to process this feed document
*
! * @return pipeline name
*/
public String getPipelineName() {
***************
*** 197,203 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param ticket DOCUMENT ME!
*/
public void setTicket(IJournalTicket ticket) {
--- 198,204 ----
/**
! * Set the ticket for this feeddocument
*
! * @param ticket
*/
public void setTicket(IJournalTicket ticket) {
***************
*** 206,212 ****
/**
! * TODO: DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public IJournalTicket getTicket() {
--- 207,213 ----
/**
! * Get the ticket
*
! * @return ticket
*/
public IJournalTicket getTicket() {
***************
*** 215,223 ****
/**
! * TODO: DOCUMENT ME!
*
! * @param obj DOCUMENT ME!
*
! * @return DOCUMENT ME!
*/
public boolean equals(Object obj) {
--- 216,224 ----
/**
! * Equality operator
*
! * @param obj
*
! * @return
*/
public boolean equals(Object obj) {
Index: FeederFactory.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/FeederFactory.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -C2 -d -r1.1 -r1.2
*** FeederFactory.java 1 Jul 2003 12:45:57 -0000 1.1
--- FeederFactory.java 4 Aug 2003 23:46:38 -0000 1.2
***************
*** 69,72 ****
--- 69,73 ----
import com.babeldoc.core.TieredConfigurationHelper;
import com.babeldoc.core.GeneralException;
+ import com.babeldoc.core.LogService;
import com.babeldoc.core.service.ServiceFactory;
import com.babeldoc.core.service.ServiceException;
***************
*** 87,90 ****
--- 88,94 ----
* Please look at the process methods which provide a number of generally useful and easy
* methods of getting data into the system.
+ *
+ * @author bmcdonald
+ * @version 1.1
*/
public class FeederFactory {
***************
*** 225,228 ****
--- 229,248 ----
return getFeeder(name).process(
new FeedDocument(pipelineName, data, new HashMap(), false, false));
+ }
+
+ /**
+ * Terminate the named feeder. This stops the feeder and then removes it from
+ * the list of loaded feeders. This means that any subsequent references to
+ * this feeder will result in a completely new implementation being created.
+ *
+ * @param name
+ */
+ public void terminate(String name) {
+ try {
+ getFeeder(name).terminate();
+ feeders.remove(name);
+ } catch (GeneralException e) {
+ LogService.getInstance().logError(e);
+ }
}
}
Index: IFeeder.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/IFeeder.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** IFeeder.java 30 Jun 2003 22:14:56 -0000 1.3
--- IFeeder.java 4 Aug 2003 23:46:38 -0000 1.4
***************
*** 68,72 ****
import com.babeldoc.core.GeneralException;
- import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
--- 68,71 ----
***************
*** 102,104 ****
--- 101,110 ----
public void initialize(Map configuration)
throws GeneralException;
+
+ /**
+ * finish the processing - this is useful to make those threaded implementations
+ * stop processing. All processing will stop some time after the last message
+ * has been processed. Once a feeder has been terminated, it may not be restarted.
+ */
+ public void terminate();
}
Index: IFeederQueue.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/IFeederQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** IFeederQueue.java 27 Jun 2003 02:19:59 -0000 1.3
--- IFeederQueue.java 4 Aug 2003 23:46:38 -0000 1.4
***************
*** 66,75 ****
package com.babeldoc.core.pipeline.feeder;
- import java.io.IOException;
-
-
/**
* A generalized implenentation of a queue that can be used to make the feeding
* and processing of documents asynchronous.
*/
public interface IFeederQueue {
--- 66,75 ----
package com.babeldoc.core.pipeline.feeder;
/**
* A generalized implenentation of a queue that can be used to make the feeding
* and processing of documents asynchronous.
+ *
+ * @author bmcdonald
+ * @version 1.1
*/
public interface IFeederQueue {
***************
*** 85,92 ****
*
* @param feedDoc
! *
! * @throws Throwable
*/
! public void add(FeedDocument feedDoc) throws Throwable;
/**
--- 85,92 ----
*
* @param feedDoc
! * @throws InterruptedException
*/
! public void add(FeedDocument feedDoc)
! throws InterruptedException;
/**
***************
*** 94,99 ****
*
* @return
*/
! public FeedDocument remove();
/**
--- 94,101 ----
*
* @return
+ * @throws InterruptedException
*/
! public FeedDocument remove()
! throws InterruptedException;
/**
***************
*** 102,108 ****
*
* @param timeOutMs
- *
* @return
*/
! public FeedDocument remove(int timeOutMs);
}
--- 104,111 ----
*
* @param timeOutMs
* @return
+ * @throws InterruptedException
*/
! public FeedDocument remove(int timeOutMs)
! throws InterruptedException;
}
Index: MemoryQueue.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/MemoryQueue.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -C2 -d -r1.4 -r1.5
*** MemoryQueue.java 16 Jul 2003 05:17:40 -0000 1.4
--- MemoryQueue.java 4 Aug 2003 23:46:38 -0000 1.5
***************
*** 66,70 ****
package com.babeldoc.core.pipeline.feeder;
! import org.apache.commons.threadpool.MTQueue;
--- 66,71 ----
package com.babeldoc.core.pipeline.feeder;
! import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
! import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
***************
*** 72,78 ****
* Represents an in-memory feeder queue implemented in a MTQueue from the apache threadpool
* implementation.
*/
! public class MemoryQueue implements IFeederQueue {
! MTQueue queue;
/**
--- 73,84 ----
* Represents an in-memory feeder queue implemented in a MTQueue from the apache threadpool
* implementation.
+ *
+ * @author bmcdonald
+ * @version 1.1
*/
! public class MemoryQueue
! implements IFeederQueue {
!
! BoundedChannel queue;
/**
***************
*** 80,84 ****
*/
public MemoryQueue() {
! queue = new MTQueue();
}
--- 86,97 ----
*/
public MemoryQueue() {
! queue = new BoundedBuffer();
! }
!
! /**
! * Creates a new MemoryQueue object.
! */
! public MemoryQueue(int capacity) {
! queue = new BoundedBuffer(capacity);
}
***************
*** 87,91 ****
*/
public int getQueueSize() {
! return queue.size();
}
--- 100,104 ----
*/
public int getQueueSize() {
! return queue.capacity();
}
***************
*** 95,100 ****
* @param feedDoc
*/
! public void add(FeedDocument feedDoc) {
! queue.add(feedDoc);
}
--- 108,114 ----
* @param feedDoc
*/
! public void add(FeedDocument feedDoc)
! throws InterruptedException {
! queue.put(feedDoc);
}
***************
*** 104,109 ****
* @return
*/
! public FeedDocument remove() {
! return (FeedDocument) queue.remove();
}
--- 118,124 ----
* @return
*/
! public FeedDocument remove()
! throws InterruptedException {
! return (FeedDocument) queue.take();
}
***************
*** 116,121 ****
* @return
*/
! public FeedDocument remove(int timeOutMs) {
! return (FeedDocument) queue.remove(timeOutMs);
}
}
--- 131,137 ----
* @return
*/
! public FeedDocument remove(int timeOutMs)
! throws InterruptedException {
! return (FeedDocument) queue.poll(timeOutMs);
}
}
Index: SynchronousFeeder.java
===================================================================
RCS file: /cvsroot/babeldoc/babeldoc/modules/core/src/com/babeldoc/core/pipeline/feeder/SynchronousFeeder.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -C2 -d -r1.5 -r1.6
*** SynchronousFeeder.java 30 Jun 2003 22:14:56 -0000 1.5
--- SynchronousFeeder.java 4 Aug 2003 23:46:38 -0000 1.6
***************
*** 88,92 ****
* @version 1.1
*/
! public class SynchronousFeeder implements IFeeder {
/**
--- 88,93 ----
* @version 1.1
*/
! public class SynchronousFeeder
! implements IFeeder {
/**
***************
*** 153,156 ****
--- 154,167 ----
public void initialize(Map configuration)
throws GeneralException {
+ }
+
+ /**
+ * finish the processing - this is useful to make those threaded implementations
+ * stop processing. All processing will stop some time after the last message
+ * has been processed.
+ * <br/>
+ * This does nothing in this class since it is a synchronous processing class.
+ */
+ public void terminate() {
}
}
|