From: brian z. <bz...@us...> - 2001-11-20 04:55:22
|
Update of /cvsroot/jython/jython/com/ziclix/python/sql/pipe/db In directory usw-pr-cvs1:/tmp/cvs-serv7094/com/ziclix/python/sql/pipe/db Added Files: BaseDB.java DBSink.java DBSource.java Log Message: initial zxJDBC checkin --- NEW FILE: BaseDB.java --- /* * Jython Database Specification API 2.0 * * $Id: BaseDB.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.pipe.db; import java.util.*; import java.lang.reflect.*; import org.python.core.*; import com.ziclix.python.sql.*; import com.ziclix.python.sql.handler.*; /** * Abstract class to assist in generating cursors. * * @author brian zimmer * @version $Revision: 1.1 $ */ public abstract class BaseDB { /** Field cursor */ protected PyCursor cursor; /** Field dataHandler */ protected Class dataHandler; /** Field tableName */ protected String tableName; /** Field connection */ protected PyConnection connection; /** * Construct the helper. */ public BaseDB(PyConnection connection, Class dataHandler, String tableName) { this.tableName = tableName; this.dataHandler = dataHandler; this.connection = connection; this.cursor = this.cursor(); } /** * Create a new constructor and optionally bind a new DataHandler. The new DataHandler must act as * a Decorator, having a single argument constructor of another DataHandler. The new DataHandler is * then expected to delegate all calls to the original while enhancing the functionality in any matter * desired. This allows additional functionality without losing any previous work or requiring any * complicated inheritance dependencies. */ protected PyCursor cursor() { PyCursor cursor = this.connection.cursor(true); DataHandler origDataHandler = cursor.getDataHandler(), newDataHandler = null; if ((origDataHandler != null) && (this.dataHandler != null)) { Constructor cons = null; try { Class[] args = new Class[1]; args[0] = DataHandler.class; cons = this.dataHandler.getConstructor(args); } catch (Exception e) { return cursor; } if (cons == null) { String msg = zxJDBC.getString("invalidCons", new Object[]{ this.dataHandler.getName() }); throw zxJDBC.newError(msg); } try { Object[] args = new Object[1]; args[0] = origDataHandler; newDataHandler = (DataHandler)cons.newInstance(args); } catch (Exception e) { return cursor; } if (newDataHandler != null) { cursor.__setattr__("datahandler", Py.java2py(newDataHandler)); } } return cursor; } } --- NEW FILE: DBSink.java --- /* * Jython Database Specification API 2.0 * * $Id: DBSink.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.pipe.db; import java.util.*; import org.python.core.*; import com.ziclix.python.sql.*; import com.ziclix.python.sql.pipe.*; import com.ziclix.python.sql.handler.*; /** * A database consumer. All data transferred will be inserted into the appropriate table. * * @author brian zimmer * @version $Revision: 1.1 $ */ public class DBSink extends BaseDB implements Sink { /** Field sql */ protected String sql; /** Field exclude */ protected Set exclude; /** Field rows */ protected PyList rows; /** Field batchsize */ protected int batchsize; /** Field bindings */ protected PyObject bindings; /** Field indexedBindings */ protected PyDictionary indexedBindings; /** * Constructor for handling the consumption of data. * * @param connection the database connection * @param dataHandler a custom DataHandler for the cursor, can be None * @param tableName the table to insert the data * @param exclude the columns to be excluded from insertion on the destination, all if None * @param bindings the optional bindings for the destination, this allows morphing of types during the copy * @param batchsize the optional batchsize for the inserts */ public DBSink(PyConnection connection, Class dataHandler, String tableName, PyObject exclude, PyObject bindings, int batchsize) { super(connection, dataHandler, tableName); this.sql = null; this.rows = new PyList(); this.bindings = bindings; this.batchsize = batchsize; this.exclude = new HashSet(); this.indexedBindings = new PyDictionary(); if (exclude != Py.None) { for (int i = 0; i < exclude.__len__(); i++) { PyObject lowered = Py.newString(((PyString)exclude.__getitem__(i)).lower()); this.exclude.add(lowered); } } } /** * Return true if the key (converted to lowercase) is not found in the exclude list. */ protected boolean excluded(PyObject key) { PyObject lowered = Py.newString(((PyString)key).lower()); return this.exclude.contains(lowered); } /** * Create the insert statement given the header row. */ protected void createSql(PyObject row) { // this should be the column info if ((row == Py.None) || (row.__len__() == 0)) { // if there are no columns, what's the point? throw zxJDBC.newError(zxJDBC.getString("noColInfo")); } int index = 0, len = row.__len__(); PyObject entry = Py.None, col = Py.None, pyIndex = Py.None; StringBuffer sb = new StringBuffer("insert into ").append(this.tableName).append(" ("); /* * Iterate through the columns and pull out the names for use in the insert * statement and the types for use in the bindings. The tuple is of the form * (column name, column type). */ for (int i = 0; i < len - 1; i++) { entry = row.__getitem__(i); col = entry.__getitem__(0); if (!this.excluded(col)) { // add to the list sb.append(col).append(","); // add the binding pyIndex = Py.newInteger(index++); try { this.indexedBindings.__setitem__(pyIndex, this.bindings.__getitem__(col)); } catch (Exception e) { // either a KeyError or this.bindings is None or null this.indexedBindings.__setitem__(pyIndex, entry.__getitem__(1)); } } } entry = row.__getitem__(len - 1); col = entry.__getitem__(0); if (!this.excluded(col)) { sb.append(col); pyIndex = Py.newInteger(index++); try { this.indexedBindings.__setitem__(pyIndex, this.bindings.__getitem__(col)); } catch (Exception e) { // either a KeyError or this.bindings is None or null this.indexedBindings.__setitem__(pyIndex, entry.__getitem__(1)); } } sb.append(") values ("); for (int i = 1; i < len; i++) { sb.append("?,"); } sb.append("?)"); if (index == 0) { throw zxJDBC.makeException(zxJDBC.ProgrammingError, zxJDBC.getString("excludedAllCols")); } this.sql = sb.toString(); } /** * Handle the row. Insert the data into the correct table and columns. No updates are done. */ public void row(PyObject row) { if (this.sql != null) { if (this.batchsize <= 0) { // no batching, just go ahead each time this.cursor.execute(this.sql, row, this.indexedBindings, Py.None); this.connection.commit(); } else { this.rows.append(row); int len = rows.__len__(); if (len % this.batchsize == 0) { this.cursor.execute(this.sql, this.rows, this.indexedBindings, Py.None); this.connection.commit(); this.rows = new PyList(); } } } else { this.createSql(row); } } /** * Method start * */ public void start() {} /** * Handles flushing any buffers and closes the cursor. */ public void end() { // finish what we started try { int len = this.rows.__len__(); if (len > 0) { this.cursor.execute(this.sql, this.rows, this.indexedBindings, Py.None); this.connection.commit(); } } finally { this.cursor.close(); } } } --- NEW FILE: DBSource.java --- /* * Jython Database Specification API 2.0 * * $Id: DBSource.java,v 1.1 2001/11/20 04:55:18 bzimmer Exp $ * * Copyright (c) 2001 brian zimmer <bz...@zi...> * */ package com.ziclix.python.sql.pipe.db; import org.python.core.*; import com.ziclix.python.sql.*; import com.ziclix.python.sql.pipe.*; import com.ziclix.python.sql.handler.*; /** * A database source. Given a PyConnection and information about the query, produce the data. * * @author brian zimmer * @version $Revision: 1.1 $ */ public class DBSource extends BaseDB implements Source { /** Field sql */ protected String sql; /** Field sentHeader */ protected boolean sentHeader; /** Field params, include */ protected PyObject params, include; /** * Constructor for handling the generation of data. * * @param connection the database connection * @param dataHandler a custom DataHandler for the cursor, can be None * @param tableName the table in question on the source database * @param where an optional where clause, defaults to '(1=1)' if null * @param include the columns to be queried from the source, '*' if None * @param params optional params to substituted in the where clause */ public DBSource(PyConnection connection, Class dataHandler, String tableName, String where, PyObject include, PyObject params) { super(connection, dataHandler, tableName); this.params = params; this.include = include; this.sentHeader = false; this.sql = this.createSql(where); } /** * Create the sql string given the where clause. */ protected String createSql(String where) { // create the sql statement, using the columns if available StringBuffer sb = new StringBuffer("select "); if ((this.include == Py.None) || (this.include.__len__() == 0)) { sb.append("*"); } else { for (int i = 1; i < this.include.__len__(); i++) { sb.append(this.include.__getitem__(i)).append(","); } sb.append(this.include.__getitem__(this.include.__len__() - 1)); } sb.append(" from ").append(this.tableName); sb.append(" where ").append((where == null) ? "(1=1)" : where); String sql = sb.toString(); return sql; } /** * Return the next row in the result set. The first row returned will be column information. */ public PyObject next() { if (this.sentHeader) { // Py.None will be sent when all done, so this will close down the queue return this.cursor.fetchone(); } else { this.cursor.execute(this.sql, this.params, Py.None, Py.None); PyObject description = this.cursor.__findattr__("description"); // we can't insert if we don't know column names if ((description == Py.None) || (description.__len__() == 0)) { // let the destination worry about handling the empty set return Py.None; } int len = description.__len__(); PyObject[] columns = new PyObject[len]; for (int i = 0; i < len; i++) { PyObject[] colInfo = new PyObject[2]; // col name colInfo[0] = description.__getitem__(i).__getitem__(0); // col type colInfo[1] = description.__getitem__(i).__getitem__(1); columns[i] = new PyTuple(colInfo); } PyObject row = new PyTuple(columns); Py.writeDebug("db-source", row.toString()); this.sentHeader = true; return row; } } /** * Method start * */ public void start() {} /** * Close the cursor. */ public void end() { if (this.cursor != null) { this.cursor.close(); } } } |