From: brian z. <bz...@us...> - 2001-11-20 04:55:22
|
Update of /cvsroot/jython/jython/com/ziclix/python/sql/util In directory usw-pr-cvs1:/tmp/cvs-serv7094/com/ziclix/python/sql/util Added Files: BCP.java PyArgParser.java Queue.java QueueClosedException.java Log Message: initial zxJDBC checkin --- NEW FILE: BCP.java --- /* * Jython Database Specification API 2.0 * * $Id: BCP.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.util; import java.util.*; import java.lang.reflect.*; import org.python.core.*; import com.ziclix.python.sql.*; import com.ziclix.python.sql.pipe.*; import com.ziclix.python.sql.pipe.db.*; import com.ziclix.python.sql.handler.*; /** * A class to perform efficient Bulk CoPy of database tables. */ public class BCP extends PyObject implements ClassDictInit { /** Field sourceDH, destDH */ protected Class sourceDH, destDH; /** Field batchsize, queuesize */ protected int batchsize, queuesize; /** Field source, destination */ protected PyConnection source, destination; /** * The source connection will produce the rows while the destination * connection will consume the rows and coerce as necessary for the * destination database. */ public BCP(PyConnection source, PyConnection destination) { this(source, destination, -1); } /** * The source connection will produce the rows while the destination * connection will consume the rows and coerce as necessary for the * destination database. * * @param batchsize used to batch the inserts on the destination */ public BCP(PyConnection source, PyConnection destination, int batchsize) { this.source = source; this.destination = destination; this.destDH = null; this.sourceDH = null; this.batchsize = batchsize; this.queuesize = 0; } // __class__ boilerplate -- see PyObject for details /** Field __class__ */ public static PyClass __class__; /** * Method getPyClass * * @return PyClass * */ protected PyClass getPyClass() { return __class__; } /** Field __methods__ */ protected static PyList __methods__; /** Field __members__ */ protected static PyList __members__; static { PyObject[] m = new PyObject[1]; m[0] = new PyString("bcp"); __methods__ = new PyList(m); m = new PyObject[6]; m[0] = new PyString("source"); m[1] = new PyString("destination"); m[2] = new PyString("batchsize"); m[3] = new PyString("queuesize"); m[4] = new PyString("sourceDataHandler"); m[5] = new PyString("destinationDataHandler"); __members__ = new PyList(m); } /** * String representation of the object. * * @return a string representation of the object. */ public String toString() { return "<BCP object instance at " + hashCode() + ">"; } /** * Sets the attribute name to value. * * @param name * @param value */ public void __setattr__(String name, PyObject value) { if ("destinationDataHandler".equals(name)) { this.destDH = (Class)value.__tojava__(Class.class); } else if ("sourceDataHandler".equals(name)) { this.sourceDH = (Class)value.__tojava__(Class.class); } else if ("batchsize".equals(name)) { this.batchsize = ((Number)value.__tojava__(Number.class)).intValue(); } else if ("queuesize".equals(name)) { this.queuesize = ((Number)value.__tojava__(Number.class)).intValue(); } else { super.__setattr__(name, value); } } /** * Gets the value of the attribute name. * * @param name * @return the attribute for the given name */ public PyObject __findattr__(String name) { if ("destinationDataHandler".equals(name)) { return Py.java2py(this.destDH); } else if ("sourceDataHandler".equals(name)) { return Py.java2py(this.sourceDH); } else if ("batchsize".equals(name)) { return Py.newInteger(this.batchsize); } else if ("queuesize".equals(name)) { return Py.newInteger(this.queuesize); } return super.__findattr__(name); } /** * Initializes the object's namespace. * * @param dict */ static public void classDictInit(PyObject dict) { dict.__setitem__("__version__", Py.newString("$Revision: 1.1 $").__getslice__(Py.newInteger(11), Py.newInteger(-2), null)); dict.__setitem__("bcp", new BCPFunc("bcp", 0, 1, 2, zxJDBC.getString("bcp"))); dict.__setitem__("batchsize", Py.newString(zxJDBC.getString("batchsize"))); dict.__setitem__("queuesize", Py.newString(zxJDBC.getString("queuesize"))); // hide from python dict.__setitem__("classDictInit", null); dict.__setitem__("toString", null); dict.__setitem__("PyClass", null); dict.__setitem__("getPyClass", null); dict.__setitem__("sourceDH", null); dict.__setitem__("destDH", null); } /** * Bulkcopy data from one database to another. * * @param fromTable the table in question on the source database * @param where an optional where clause, defaults to '(1=1)' if null * @param params optional params to substituted in the where clause * @param include the columns to be queried from the source, '*' if None * @param exclude the columns to be excluded from insertion on the destination, all if None * @param toTable if non-null, the table in the destination db, otherwise the same table name as the source * @param bindings the optional bindings for the destination, this allows morphing of types during the copy * @return the count of the total number of rows bulk copied, -1 if the query returned no rows */ protected PyObject bcp(String fromTable, String where, PyObject params, PyObject include, PyObject exclude, String toTable, PyObject bindings) { Pipe pipe = new Pipe(); String _toTable = (toTable == null) ? fromTable : toTable; DBSource source = new DBSource(this.source, sourceDH, fromTable, where, include, params); DBSink sink = new DBSink(this.destination, destDH, _toTable, exclude, bindings, this.batchsize); return pipe.pipe(source, sink).__sub__(Py.newInteger(1)); } } /** * Class BCPFunc * * @author * @date $today.date$ * @author last modified by $Author: bzimmer $ * @date last modified on $Date: 2001/11/20 04:55:18 $ * @version $Revision: 1.1 $ * @copyright 2001 brian zimmer */ class BCPFunc extends PyBuiltinFunctionSet { /** * Constructor BCPFunc * * @param String name * @param int index * @param int argcount * @param String doc * */ BCPFunc(String name, int index, int argcount, String doc) { super(name, index, argcount, argcount, true, doc); } /** * Constructor BCPFunc * * @param String name * @param int index * @param int minargs * @param int maxargs * @param String doc * */ BCPFunc(String name, int index, int minargs, int maxargs, String doc) { super(name, index, minargs, maxargs, true, doc); } /** * Method __call__ * * @param PyObject arg * * @return PyObject * */ public PyObject __call__(PyObject arg) { BCP bcp = (BCP)__self__; switch (index) { case 0 : String table = (String)arg.__tojava__(String.class); if (table == null) { throw Py.ValueError(zxJDBC.getString("invalidTableName")); } PyObject count = bcp.bcp(table, null, Py.None, Py.None, Py.None, null, Py.None); return count; default : throw argCountError(1); } } /** * Method __call__ * * @param PyObject arga * @param PyObject argb * * @return PyObject * */ public PyObject __call__(PyObject arga, PyObject argb) { BCP bcp = (BCP)__self__; switch (index) { case 0 : String table = (String)arga.__tojava__(String.class); if (table == null) { throw Py.ValueError(zxJDBC.getString("invalidTableName")); } String where = (String)argb.__tojava__(String.class); PyObject count = bcp.bcp(table, where, Py.None, Py.None, Py.None, null, Py.None); return count; default : throw argCountError(2); } } /** * Method __call__ * * @param PyObject arga * @param PyObject argb * @param PyObject argc * * @return PyObject * */ public PyObject __call__(PyObject arga, PyObject argb, PyObject argc) { BCP bcp = (BCP)__self__; switch (index) { case 0 : String table = (String)arga.__tojava__(String.class); if (table == null) { throw Py.ValueError(zxJDBC.getString("invalidTableName")); } String where = (String)argb.__tojava__(String.class); PyObject count = bcp.bcp(table, where, argc, Py.None, Py.None, null, Py.None); return count; default : throw argCountError(3); } } /** * Method __call__ * * @param PyObject[] args * @param String[] keywords * * @return PyObject * */ public PyObject __call__(PyObject[] args, String[] keywords) { BCP bcp = (BCP)__self__; switch (index) { case 0 : /* * B.bcp(table, [where=None, params=None, include=None, exclude=None, toTable=None, bindings=None]) */ String where = null; PyObject params = Py.None; PyArgParser parser = new PyArgParser(args, keywords); String table = (String)parser.arg(0, Py.None).__tojava__(String.class); if (table == null) { throw Py.ValueError(zxJDBC.getString("invalidTableName")); } // 'where' can be the second argument or a keyword if (parser.numArg() >= 2) { where = (String)parser.arg(1, Py.None).__tojava__(String.class); } if (where == null) { where = (String)parser.kw("where", Py.None).__tojava__(String.class); } // 'params' can be the third argument or a keyword if (parser.numArg() >= 3) { params = parser.arg(2, Py.None); } if (params == Py.None) { params = parser.kw("params", Py.None); } String toTable = (String)parser.kw("toTable", Py.None).__tojava__(String.class); PyObject include = parser.kw("include", Py.None); PyObject exclude = parser.kw("exclude", Py.None); PyObject bindings = parser.kw("bindings", Py.None); PyObject count = bcp.bcp(table, where, params, include, exclude, toTable, bindings); return count; default : throw argCountError(3); } } } --- NEW FILE: PyArgParser.java --- /* * Jython Database Specification API 2.0 * * $Id: PyArgParser.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.util; import java.io.*; import java.sql.*; import java.math.*; import java.util.*; import org.python.core.*; /** * Parse the args and kws for a method call. * * @author brian zimmer * @version $Revision: 1.1 $ */ public class PyArgParser extends Object { /** Field keywords */ protected Map keywords; /** Field arguments */ protected PyObject[] arguments; /** * Construct a parser with the arguments and keywords. */ public PyArgParser(PyObject[] args, String[] kws) { this.keywords = new HashMap(); this.arguments = null; parse(args, kws); } /** * Method parse * * @param PyObject[] args * @param String[] kws * */ protected void parse(PyObject[] args, String[] kws) { // walk backwards through the kws and build the map int largs = args.length; if (kws != null) { for (int i = kws.length - 1; i >= 0; i--) { keywords.put(kws[i], args[--largs]); } } this.arguments = new PyObject[largs]; System.arraycopy(args, 0, this.arguments, 0, largs); } /** * How many keywords? */ public int numKw() { return this.keywords.keySet().size(); } /** * Does the keyword exist? */ public boolean hasKw(String kw) { return this.keywords.containsKey(kw); } /** * Return the value for the keyword, raise a KeyError if the keyword does * not exist. */ public PyObject kw(String kw) { if (!hasKw(kw)) { throw Py.KeyError(kw); } return (PyObject)this.keywords.get(kw); } /** * Return the value for the keyword, return the default if the keyword does * not exist. */ public PyObject kw(String kw, PyObject def) { if (!hasKw(kw)) { return def; } return (PyObject)this.keywords.get(kw); } /** * Get the array of keywords. */ public String[] kws() { return (String[])this.keywords.keySet().toArray(new String[0]); } /** * Get the number of arguments. */ public int numArg() { return this.arguments.length; } /** * Return the argument at the given index, raise an IndexError if out of range. */ public PyObject arg(int index) { if ((index >= 0) && (index <= this.arguments.length - 1)) { return this.arguments[index]; } throw Py.IndexError("index out of range"); } /** * Return the argument at the given index, or the default if the index is out of range. */ public PyObject arg(int index, PyObject def) { if ((index >= 0) && (index <= this.arguments.length - 1)) { return this.arguments[index]; } return def; } } --- NEW FILE: Queue.java --- /* * Jython Database Specification API 2.0 * * $Id: Queue.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.util; import java.util.LinkedList; /** * This queue blocks until closed or an element is enqueued. If the queue * reaches capacity, the dequeue thread gets priority in order to bring the * queue size under a certain threshold. * * @author brian zimmer * @version $Revision: 1.1 $ */ public class Queue { /** Field closed */ protected boolean closed; /** Field queue */ protected LinkedList queue; /** Field capacity, threshold */ protected int capacity, threshold; /** * Instantiate a blocking queue with no bounded capacity. */ public Queue() { this(0); } /** * Instantiate a blocking queue with the specified capacity. */ public Queue(int capacity) { this.closed = false; this.capacity = capacity; this.queue = new LinkedList(); this.threshold = (int)(this.capacity * 0.75f); } /** * Enqueue an object and notify all waiting Threads. */ public synchronized void enqueue(Object element) throws InterruptedException { if (closed) { throw new QueueClosedException(); } this.queue.addLast(element); this.notify(); /* * Block while the capacity of the queue has been breached. */ while ((this.capacity > 0) && (this.queue.size() >= this.capacity)) { this.wait(); if (closed) { throw new QueueClosedException(); } } } /** * Blocks until an object is dequeued or the queue is closed. */ public synchronized Object dequeue() throws InterruptedException { while (this.queue.size() <= 0) { this.wait(); if (closed) { throw new QueueClosedException(); } } Object object = this.queue.removeFirst(); // if space exists, notify the other threads if (this.queue.size() < this.threshold) { this.notify(); } return object; } /** * Close the queue and notify all waiting Threads. */ public synchronized void close() { this.closed = true; this.notifyAll(); } } --- NEW FILE: QueueClosedException.java --- /* * Jython Database Specification API 2.0 * * $Id: QueueClosedException.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.util; /** * This exception is thrown when the queue is closed and an operation is attempted. * * @author brian zimmer * @version $Revision: 1.1 $ */ public class QueueClosedException extends RuntimeException { /** * Constructor QueueClosedException * */ public QueueClosedException() { super(); } /** * Constructor QueueClosedException * * @param String msg * */ public QueueClosedException(String msg) { super(msg); } } |