From: <pc...@us...> - 2012-10-29 15:20:30
|
Revision: 57264 http://firebird.svn.sourceforge.net/firebird/?rev=57264&view=rev Author: pcisar Date: 2012-10-29 15:20:23 +0000 (Mon, 29 Oct 2012) Log Message: ----------- - cursor.execute improvement - Fix for unregistered bug with int/bigint data Modified Paths: -------------- python/fdb/trunk/fdb/fbcore.py python/fdb/trunk/test/testfdb.py Modified: python/fdb/trunk/fdb/fbcore.py =================================================================== --- python/fdb/trunk/fdb/fbcore.py 2012-10-28 18:47:30 UTC (rev 57263) +++ python/fdb/trunk/fdb/fbcore.py 2012-10-29 15:20:23 UTC (rev 57264) @@ -405,6 +405,20 @@ def bytes_to_int(b): # Read as little endian. len_b = len(b) if len_b == 1: + fmt = 'b' + elif len_b == 2: + fmt = '<h' + elif len_b == 4: + fmt = '<l' + elif len_b == 8: + fmt = '<q' + else: + raise InternalError + return struct.unpack(fmt, b)[0] + +def bytes_to_uint(b): # Read as little endian. + len_b = len(b) + if len_b == 1: fmt = 'B' elif len_b == 2: fmt = '<H' @@ -418,19 +432,32 @@ def bint_to_bytes(val, nbytes): # Convert int value to big endian bytes. if nbytes == 1: - fmt = 'B' + fmt = 'b' elif nbytes == 2: - fmt = '>H' + fmt = '>h' elif nbytes == 4: - fmt = '>L' + fmt = '>l' elif nbytes == 8: - fmt = '>Q' + fmt = '>q' else: raise InternalError return struct.pack(fmt, val) def int_to_bytes(val, nbytes): # Convert int value to little endian bytes. if nbytes == 1: + fmt = 'b' + elif nbytes == 2: + fmt = '<h' + elif nbytes == 4: + fmt = '<l' + elif nbytes == 8: + fmt = '<q' + else: + raise InternalError + return struct.pack(fmt, val) + +def uint_to_bytes(val, nbytes): # Convert int value to little endian bytes. + if nbytes == 1: fmt = 'B' elif nbytes == 2: fmt = '<H' @@ -1699,7 +1726,7 @@ if ord2(info[0]) != ibase.isc_info_sql_get_plan: raise IndentationError("Unexpected code in result buffer while" " querying SQL plan.") - size = bytes_to_int(info[1:_SIZE_OF_SHORT + 1]) + size = bytes_to_uint(info[1:_SIZE_OF_SHORT + 1]) # Skip first byte: a new line ### Todo: Better handling of P version specifics result = ctypes.string_at(info[_SIZE_OF_SHORT + 2:], size - 1) @@ -1868,9 +1895,9 @@ while ord2(info[res_walk]) != isc_info_end: cur_count_type = ord2(info[res_walk]) res_walk += 1 - size = bytes_to_int(info[res_walk:res_walk + short_size]) + size = bytes_to_uint(info[res_walk:res_walk + short_size]) res_walk += short_size - count = bytes_to_int(info[res_walk:res_walk + size]) + count = bytes_to_uint(info[res_walk:res_walk + size]) if ((cur_count_type == ibase.isc_info_req_select_count and self.statement_type == isc_info_sql_stmt_select) or (cur_count_type == ibase.isc_info_req_insert_count @@ -2024,7 +2051,7 @@ reallength = sqlvar.sqllen value = value[:reallength] elif vartype == SQL_VARYING: - size = bytes_to_int(sqlvar.sqldata[:1]) + size = bytes_to_uint(sqlvar.sqldata[:1]) #value = ctypes.string_at(sqlvar.sqldata[2],2+size) ### Todo: verify handling of P version differences if PYTHON_MAJOR_VER == 3: @@ -2055,8 +2082,8 @@ value = struct.unpack('d', sqlvar.sqldata[:sqlvar.sqllen])[0] elif vartype == SQL_BLOB: val = sqlvar.sqldata[:sqlvar.sqllen] - blobid = ibase.ISC_QUAD(bytes_to_int(val[:4]), - bytes_to_int(val[4:sqlvar.sqllen])) + blobid = ibase.ISC_QUAD(bytes_to_uint(val[:4]), + bytes_to_uint(val[4:sqlvar.sqllen])) # Check if stream BLOB is requested instead materialized one use_stream = False if self.__streamed_blobs: @@ -2099,17 +2126,17 @@ self._isc_status, "Cursor.read_output_blob/isc_blob_info:") offset = 0 - while bytes_to_int(result[offset]) != isc_info_end: - code = bytes_to_int(result[offset]) + while bytes_to_uint(result[offset]) != isc_info_end: + code = bytes_to_uint(result[offset]) offset += 1 if code == ibase.isc_info_blob_total_length: - length = bytes_to_int(result[offset:offset + 2]) - blob_length = bytes_to_int(result[ + length = bytes_to_uint(result[offset:offset + 2]) + blob_length = bytes_to_uint(result[ offset + 2:offset + 2 + length]) offset += length + 2 elif code == ibase.isc_info_blob_max_segment: - length = bytes_to_int(result[offset:offset + 2]) - segment_size = bytes_to_int(result[ + length = bytes_to_uint(result[offset:offset + 2]) + segment_size = bytes_to_uint(result[ offset + 2:offset + 2 + length]) offset += length + 2 # Load BLOB @@ -2695,6 +2722,8 @@ self._ps = self._prepared_statements.setdefault(operation, PreparedStatement(operation, self, True)) self._ps._execute(parameters) + # return self so `execute` call could be used as iterable + return self def prep(self, operation): """Create prepared statement for repeated execution. @@ -3259,7 +3288,7 @@ buf = self.transaction_info(infoCode, 's') buf = buf[1 + struct.calcsize('h'):] if len(buf) == 1: - results[infoCode] = bytes_to_int(buf) + results[infoCode] = bytes_to_uint(buf) else: # For isolation level isc_info_tra_read_committed, the # first byte indicates the isolation level @@ -3268,8 +3297,8 @@ # isc_info_tra_no_rec_version). isolationLevelByte, recordVersionByte = struct.unpack('cc', buf) - isolationLevel = bytes_to_int(isolationLevelByte) - recordVersion = bytes_to_int(recordVersionByte) + isolationLevel = bytes_to_uint(isolationLevelByte) + recordVersion = bytes_to_uint(recordVersionByte) results[infoCode] = (isolationLevel, recordVersion) else: # At the time of this writing (2006.02.09), @@ -3330,7 +3359,7 @@ if request_buffer[0] != res_buf[0]: raise InternalError("Result code does not match request code.") if result_type.upper() == 'I': - return bytes_to_int(res_buf[3:3 + bytes_to_int(res_buf[1:3])]) + return bytes_to_uint(res_buf[3:3 + bytes_to_int(res_buf[1:3])]) elif result_type.upper() == 'S': return p3fix(ctypes.string_at(res_buf, i),self.__python_charset) else: @@ -3705,17 +3734,17 @@ self._isc_status, "Cursor.read_output_blob/isc_blob_info:") offset = 0 - while bytes_to_int(result[offset]) != isc_info_end: - code = bytes_to_int(result[offset]) + while bytes_to_uint(result[offset]) != isc_info_end: + code = bytes_to_uint(result[offset]) offset += 1 if code == ibase.isc_info_blob_total_length: - length = bytes_to_int(result[offset:offset + 2]) - self._blob_length = bytes_to_int(result[ + length = bytes_to_uint(result[offset:offset + 2]) + self._blob_length = bytes_to_uint(result[ offset + 2:offset + 2 + length]) offset += length + 2 elif code == ibase.isc_info_blob_max_segment: - length = bytes_to_int(result[offset:offset + 2]) - self._segment_size = bytes_to_int(result[ + length = bytes_to_uint(result[offset:offset + 2]) + self._segment_size = bytes_to_uint(result[ offset + 2:offset + 2 + length]) offset += length + 2 # Create internal buffer Modified: python/fdb/trunk/test/testfdb.py =================================================================== --- python/fdb/trunk/test/testfdb.py 2012-10-28 18:47:30 UTC (rev 57263) +++ python/fdb/trunk/test/testfdb.py 2012-10-29 15:20:23 UTC (rev 57264) @@ -651,6 +651,12 @@ cur.execute('select C1,C2,C3 from T2 where C1 = 1') rows = cur.fetchall() assert repr(rows) == "[(1, 1, 1)]" + cur.execute('insert into T2 (C1,C2,C3) values (?,?,?)',[2,1,sys.maxint]) + cur.execute('insert into T2 (C1,C2,C3) values (?,?,?)',[2,1,-sys.maxint-1]) + self.con.commit() + cur.execute('select C1,C2,C3 from T2 where C1 = 2') + rows = cur.fetchall() + assert repr(rows) == "[(2, 1, 9223372036854775807), (2, 1, -9223372036854775808)]" def test_insert_char_varchar(self): cur = self.con.cursor() cur.execute('insert into T2 (C1,C4,C5) values (?,?,?)',[2,'AA','AA']) @@ -694,6 +700,13 @@ cur.execute('select C1,C9 from T2 where C1 = 4') rows = cur.fetchall() assert repr(rows) == "[(4, 'This is a BLOB!')]" + # BLOB bigger than max. segment size + big_blob = '123456789' * 10000 + cur.execute('insert into T2 (C1,C9) values (?,?)',[5,big_blob]) + self.con.commit() + cur.execute('select C1,C9 from T2 where C1 = 5') + row = cur.fetchone() + assert row[1] == big_blob def test_insert_float_double(self): cur = self.con.cursor() cur.execute('insert into T2 (C1,C12,C13) values (?,?,?)',[5,1.0,1.0]) This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |