[Sqlalchemy-tickets] Issue #3384: how to generate complicated/nested sql statement for postgres up
Brought to you by:
zzzeek
|
From: wenlong_lu <iss...@bi...> - 2015-04-24 08:13:30
|
New issue 3384: how to generate complicated/nested sql statement for postgres upsert/merge https://bitbucket.org/zzzeek/sqlalchemy/issue/3384/how-to-generate-complicated-nested-sql wenlong_lu: I have several large(billions of rows) tables which need upsert operations. I have checked the #960 and #2551, and still did not get what I want. if you think this is duplicated, pls close it. ### upsert idea from PostgreSQL official page I just follow the idea presented on postgres page: http://www.postgresql.org/docs/9.4/static/plpgsql-control-structures.html#PLPGSQL-ERROR-TRAPPING ```pgsql CREATE TABLE db (a INT PRIMARY KEY, b TEXT); CREATE FUNCTION merge_db(key INT, data TEXT) RETURNS VOID AS $$ BEGIN LOOP -- first try to update the key UPDATE db SET b = data WHERE a = key; IF found THEN RETURN; END IF; -- not there, so try to insert the key -- if someone else inserts the same key concurrently, -- we could get a unique-key failure BEGIN INSERT INTO db(a,b) VALUES (key, data); RETURN; EXCEPTION WHEN unique_violation THEN -- Do nothing, and loop to try the UPDATE again. END; END LOOP; END; $$ LANGUAGE plpgsql; SELECT merge_db(1, 'david'); SELECT merge_db(1, 'dennis') ``` ### Our modified version Since our insert and update statement are dynamic and complex type(jsonb, hstore, etc) involved, We prefer to build sql statements on sqlalchemy side. our pgsql script follows as: ```pgsql -- Function: merge_record(text, text) -- DROP FUNCTION merge_record(text, text); CREATE OR REPLACE FUNCTION merge_record(update_expression text, insert_expression text) RETURNS integer AS $BODY$ declare row_affected integer; BEGIN -- usually loop twice would be ok, here we set max value as 50 times. -- it should be enough FOR i IN 1..50 LOOP -- first try to update the key execute update_expression; GET DIAGNOSTICS row_affected = ROW_COUNT; IF row_affected > 0 THEN RETURN 1; END IF; -- not there, so try to insert the key -- if someone else inserts the same key concurrently, -- we could get a unique-key failure BEGIN execute insert_expression; RETURN 1; EXCEPTION WHEN unique_violation THEN -- Do nothing, and loop to try the UPDATE again. END; END LOOP; END; $BODY$ LANGUAGE plpgsql VOLATILE COST 100; ALTER FUNCTION merge_record(text, text) OWNER TO postgres; ``` ### Target script to be generated SO what i want to build on client side is ```sql select merge_record(update_expression, insert_expression); e.g. select merge_record( 'update target_table set s1="v1", jsonb2={"k1": "v1", "k1": "v2"}, hstore3={"k3"=>"v3", "k4"=>"v4"} where unique_id=45678766', 'insert into target_table (s1, jsonb2, hstore3, unique_id) VALUES("v1", {"k1": "v1", "k1": "v2"}, {"k3"=>"v3", "k4"=>"v4"}, 45678766)' ); ``` ### Problems we encountered WE have used sqlalchemy SQL language expression to build the SQL statements dynamically. BUT the problem is that, values have many special characters, like $, \n etc (some of them are html text, so everything is possible). The exceptions are, that is invalid SQL statements generated, always happen several times per month. ## Reasons for the problems **SO we think we must miss sth**. pls give some suggestion if any. related code follows as: ```python raw_upsert_sql_statement = "select merge_record('%s', '%s')" % (update_expression, insert_expression) conn.execute(text(raw_upsert_sql_statement)) insert_expression dialect = postgresql.dialect() target_table = self.__table__ stmt = target_table.insert().values(attr_value_hash) sql_template = unicode(stmt.compile(dialect=dialect)) attr_value_hash = self._convert_safe_sql_str(attr_value_hash) insert_expression = sql_template % (attr_value_hash) update_expression dialect = postgresql.dialect() stmt = target_table.update().where(eval(self.condition_script)).values(attr_template_hash) sql_template = unicode(stmt.compile(dialect=dialect)) attr_value_hash = self._convert_safe_sql_str(attr_value_hash) update_expression = sql_template % (attr_value_hash) ### this should be the stupid parts def _convert_safe_sql_str(self, params): ret_params = {} log.debug("Before converting: %s" % params) # error:'dict' object does not support indexing # fix: replace("%", "%%") for k, v in params.iteritems(): try: if type(v) in (datetime, str, unicode): ret_params[k] = ('$$%s$$' % v).replace("'", "").replace("%", "%%") elif type(v) in (MutableList, list): ret_params[k] = ('$$%s$$' % v).replace("[", "{").replace("]", "}").replace("u'", "").replace("'", "").replace("%", "%%") elif type(v) == MutableDict: ret_params[k] = ('$$%s$$' % v).replace("{", "").replace("}", "").replace(":", "=>").replace("'", "").replace("%", "%%") elif type(v) == dict: ret_params[k] = ('$$%s$$' % json.dumps(v)).replace("\'", "").replace("%", "%%") elif type(v) in (int, bool, float): ret_params[k] = v except Exception as ex: log.error(ex.message) log.error(traceback.format_exc()) raise ex log.debug("After converting: %s" % ret_params) return ret_params ``` thx a lot. wenlong |