From: <ale...@us...> - 2014-11-25 13:24:14
|
Revision: 60291 http://sourceforge.net/p/firebird/code/60291 Author: alexpeshkoff Date: 2014-11-25 13:24:10 +0000 (Tue, 25 Nov 2014) Log Message: ----------- Implemented CORE-733: Compress Data over the Network Modified Paths: -------------- firebird/trunk/builds/install/misc/firebird.conf.in firebird/trunk/configure.ac firebird/trunk/src/common/config/config.cpp firebird/trunk/src/common/config/config.h firebird/trunk/src/remote/client/interface.cpp firebird/trunk/src/remote/inet.cpp firebird/trunk/src/remote/protocol.h firebird/trunk/src/remote/remot_proto.h firebird/trunk/src/remote/remote.cpp firebird/trunk/src/remote/remote.h firebird/trunk/src/remote/server/server.cpp Modified: firebird/trunk/builds/install/misc/firebird.conf.in =================================================================== --- firebird/trunk/builds/install/misc/firebird.conf.in 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/builds/install/misc/firebird.conf.in 2014-11-25 13:24:10 UTC (rev 60291) @@ -565,6 +565,17 @@ #WireCrypt = Enabled (for client) / Required (for server) # +# Should connection over the wire be compressed? +# Client only value - server should follow client setting if connect using +# correct protocol (>=13). +# +# Per-connection configurable. +# +# Type: boolean +# +#WireCompression = false + +# # Seconds to wait on a silent client connection before the server sends # dummy packets to request acknowledgment. # Modified: firebird/trunk/configure.ac =================================================================== --- firebird/trunk/configure.ac 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/configure.ac 2014-11-25 13:24:10 UTC (rev 60291) @@ -723,6 +723,7 @@ AC_CHECK_HEADERS(iconv.h) AC_CHECK_HEADERS(libio.h) AC_CHECK_HEADERS(linux/falloc.h) +AC_CHECK_HEADERS(zlib.h) dnl check for ICU presence AC_CHECK_HEADER(unicode/ucnv.h,,AC_MSG_ERROR(ICU support not found - please install development ICU package)) @@ -739,6 +740,7 @@ dnl Check for libraries AC_SEARCH_LIBS(dlopen, dl) +AC_CHECK_LIB(z, deflate) AC_CHECK_LIB(m, main) if test "$EDITLINE_FLG" = "Y"; then AC_CHECK_LIB(curses, tgetent, TERMLIB=curses, \ Modified: firebird/trunk/src/common/config/config.cpp =================================================================== --- firebird/trunk/src/common/config/config.cpp 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/common/config/config.cpp 2014-11-25 13:24:10 UTC (rev 60291) @@ -198,7 +198,8 @@ {TYPE_STRING, "WireCryptPlugin", (ConfigValue) "Arc4"}, {TYPE_STRING, "KeyHolderPlugin", (ConfigValue) ""}, {TYPE_BOOLEAN, "RemoteAccess", (ConfigValue) true}, - {TYPE_BOOLEAN, "IPv6V6Only", (ConfigValue) false} + {TYPE_BOOLEAN, "IPv6V6Only", (ConfigValue) false}, + {TYPE_BOOLEAN, "WireCompression", (ConfigValue) false} }; /****************************************************************************** @@ -750,3 +751,8 @@ { return get<bool>(KEY_REMOTE_ACCESS); } + +bool Config::getWireCompression() const +{ + return get<bool>(KEY_WIRE_COMPRESSION); +} Modified: firebird/trunk/src/common/config/config.h =================================================================== --- firebird/trunk/src/common/config/config.h 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/common/config/config.h 2014-11-25 13:24:10 UTC (rev 60291) @@ -136,6 +136,7 @@ KEY_PLUG_KEY_HOLDER, KEY_REMOTE_ACCESS, KEY_IPV6_V6ONLY, + KEY_WIRE_COMPRESSION, MAX_CONFIG_KEY // keep it last }; @@ -333,6 +334,8 @@ int getWireCrypt(WireCryptMode wcMode) const; bool getRemoteAccess() const; + + bool getWireCompression() const; }; // Implementation of interface to access master configuration file Modified: firebird/trunk/src/remote/client/interface.cpp =================================================================== --- firebird/trunk/src/remote/client/interface.cpp 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/client/interface.cpp 2014-11-25 13:24:10 UTC (rev 60291) @@ -5952,6 +5952,7 @@ P_OP operation = op_void; { // scope RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + fb_assert(!port->port_compressed); stuff = port->receive(&packet); operation = packet.p_operation; @@ -6235,6 +6236,9 @@ HANDSHAKE_DEBUG(fprintf(stderr, "Cli: authReceiveResponse: cond_accept d=%d n=%d '%.*s' 0x%x\n", d->cstr_length, n->cstr_length, n->cstr_length, n->cstr_address, n->cstr_address ? n->cstr_address[0] : 0)); + if (packet->p_acpd.p_acpt_type & pflag_compress) + port->initCompression(); + packet->p_acpd.p_acpt_type &= ptype_MASK; break; case op_crypt: Modified: firebird/trunk/src/remote/inet.cpp =================================================================== --- firebird/trunk/src/remote/inet.cpp 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/inet.cpp 2014-11-25 13:24:10 UTC (rev 60291) @@ -218,9 +218,9 @@ } #endif // DEBUG -const SLONG MAX_DATA_LW = 1448; // Low Water mark -const SLONG MAX_DATA_HW = 32768; // High Water mark -const SLONG DEF_MAX_DATA = 8192; +const ULONG MAX_DATA_LW = 1448; // Low Water mark +const ULONG MAX_DATA_HW = 32768; // High Water mark +const ULONG DEF_MAX_DATA = 8192; //const int MAXHOSTLEN = 64; @@ -281,6 +281,10 @@ HandleState ok(const rem_port* port) { +#ifdef WIRE_COMPRESS_SUPPORT + if (port->port_flags & PORT_z_data) + return SEL_READY; +#endif SOCKET n = port->port_handle; #if defined(WIN_NT) return FD_ISSET(n, &slct_fdset) ? SEL_READY : SEL_NO_DATA; @@ -440,7 +444,7 @@ static bool_t inet_getbytes(XDR*, SCHAR *, u_int); static void inet_error(bool, rem_port*, const TEXT*, ISC_STATUS, int); static bool_t inet_putbytes(XDR*, const SCHAR*, u_int); -static bool_t inet_read(XDR*); +static bool inet_read(XDR*); static rem_port* inet_try_connect( PACKET*, Rdb*, const PathName&, @@ -448,7 +452,7 @@ ClumpletReader&, RefPtr<Config>*, const PathName*); -static bool_t inet_write(XDR*); //, int); +static bool inet_write(XDR*); static void INET_server_socket(rem_port* port, USHORT flag, const addrinfo* pai); #ifdef DEBUG @@ -456,6 +460,7 @@ #endif static bool packet_receive(rem_port*, UCHAR*, SSHORT, SSHORT*); +static bool packet_receive2(rem_port*, UCHAR*, SSHORT, SSHORT*); static bool packet_send(rem_port*, const SCHAR*, SSHORT); static rem_port* receive(rem_port*, PACKET *); static rem_port* select_accept(rem_port*); @@ -506,7 +511,7 @@ -SLONG INET_remote_buffer; +ULONG INET_remote_buffer; static GlobalPtr<Mutex> init_mutex; static volatile bool INET_initialized = false; static volatile bool INET_shutting_down = false; @@ -585,6 +590,10 @@ user_id.insertBytes(CNCT_group, reinterpret_cast<UCHAR*>(&eff_gid), sizeof(eff_gid)); } + // Should compression be tried? + + bool compression = config && (*config)->getWireCompression(); + // Establish connection to server // If we want user verification, we can't speak anything less than version 7 @@ -605,10 +614,10 @@ for (size_t i = 0; i < cnct->p_cnct_count; i++) { cnct->p_cnct_versions[i] = protocols_to_try[i]; + if (compression && cnct->p_cnct_versions[i].p_cnct_version >= PROTOCOL_VERSION13) + cnct->p_cnct_versions[i].p_cnct_max_type |= pflag_compress; } - // Try connection using first set of protocols - rem_port* port = inet_try_connect(packet, rdb, file_name, node_name, dpb, config, ref_db_name); P_ACPT* accept = NULL; @@ -671,6 +680,9 @@ port->port_flags |= PORT_symmetric; } + bool compress = accept->p_acpt_type & pflag_compress; + accept->p_acpt_type &= ptype_MASK; + if (accept->p_acpt_type != ptype_out_of_band) { port->port_flags |= PORT_no_oob; } @@ -679,6 +691,9 @@ port->port_flags |= PORT_lazy; } + if (compress) + port->initCompression(); + return port; } @@ -1275,14 +1290,14 @@ port->port_request = aux_request; port->port_buff_size = (USHORT) INET_remote_buffer; port->port_async_receive = inet_async_receive; - port->port_flags = flags; + port->port_flags |= flags; - xdrinet_create( &port->port_send, port, - &port->port_buffer[INET_remote_buffer], - (USHORT) INET_remote_buffer, - XDR_ENCODE); + xdrinet_create(&port->port_send, port, + &port->port_buffer[REM_SEND_OFFSET(INET_remote_buffer)], + (USHORT) INET_remote_buffer, XDR_ENCODE); - xdrinet_create( &port->port_receive, port, port->port_buffer, 0, XDR_DECODE); + xdrinet_create(&port->port_receive, port, + &port->port_buffer[REM_RECV_OFFSET(INET_remote_buffer)], 0, XDR_DECODE); if (parent && !(parent->port_server_flags & SRVR_thread_per_port)) { @@ -1894,7 +1909,7 @@ } else if (port = select_accept(main_port)) { - if (!packet_receive(port, buffer, bufsize, length)) + if (!REMOTE_inflate(port, packet_receive, buffer, bufsize, length)) { *length = 0; } @@ -1914,7 +1929,7 @@ return true; } - if (!packet_receive(port, buffer, bufsize, length)) + if (!REMOTE_inflate(port, packet_receive, buffer, bufsize, length)) { if (port->port_flags & PORT_disconnect) { continue; @@ -2191,7 +2206,7 @@ port->port_send.x_client = !(port->port_flags & PORT_server); #endif if (!xdr_protocol(&port->port_send, packet)) - return FALSE; + return false; #ifdef DEBUG { // scope @@ -2206,7 +2221,7 @@ } // end scope #endif - return inet_write(&port->port_send /*, TRUE*/); + return REMOTE_deflate(&port->port_send, inet_write, packet_send, true); } static int send_partial( rem_port* port, PACKET * packet) @@ -2262,7 +2277,7 @@ xdrs->x_ops = (xdr_t::xdr_ops*) &inet_ops; xdrs->x_op = x_op; - return TRUE; + return true; } #ifdef HAVE_SETITIMER @@ -2447,8 +2462,10 @@ xdrs->x_handy = 0; } - if (!inet_write(xdrs /*, 0*/)) + if (!REMOTE_deflate(xdrs, inet_write, packet_send, false)) + { return FALSE; + } } // Scalar values and bulk transfer remainder fall thru @@ -2468,7 +2485,7 @@ while (--bytecount >= 0) { - if (xdrs->x_handy <= 0 && !inet_write(xdrs /*, 0*/)) + if (xdrs->x_handy <= 0 && !REMOTE_deflate(xdrs, inet_write, packet_send, false)) return FALSE; --xdrs->x_handy; *xdrs->x_private++ = *buff++; @@ -2477,7 +2494,7 @@ return TRUE; } -static bool_t inet_read( XDR* xdrs) +static bool inet_read( XDR* xdrs) { /************************************** * @@ -2504,27 +2521,40 @@ p += xdrs->x_handy; } + SSHORT length = end - p; + port->port_flags &= ~PORT_z_data; + if (!REMOTE_inflate(port, packet_receive2, (UCHAR*)p, length, &length)) + return false; + p += length; + + xdrs->x_handy = (int) ((SCHAR *) p - xdrs->x_base); + xdrs->x_private = xdrs->x_base; + + return true; +} + +static bool packet_receive2(rem_port* port, UCHAR* p, SSHORT bufSize, SSHORT* length) +{ + *length = 0; + while (true) { - SSHORT length = end - p; - if (!packet_receive(port, reinterpret_cast<UCHAR*>(p), length, &length)) + SSHORT l = bufSize - *length; + if (!packet_receive(port, p + *length, l, &l)) + return false; + + if (l >= 0) { - return FALSE; - } - if (length >= 0) - { - p += length; + *length += l; break; } - p -= length; + + *length -= l; if (!packet_send(port, 0, 0)) - return FALSE; + return false; } - xdrs->x_handy = (int) ((SCHAR *) p - xdrs->x_base); - xdrs->x_private = xdrs->x_base; - - return TRUE; + return true; } static rem_port* inet_try_connect(PACKET* packet, @@ -2565,7 +2595,7 @@ rem_port* port = NULL; try { - port = INET_connect(node_name, packet, FALSE, &dpb, config); + port = INET_connect(node_name, packet, false, &dpb, config); } catch (const Exception&) { @@ -2587,7 +2617,7 @@ return port; } -static bool_t inet_write(XDR* xdrs) +static bool inet_write(XDR* xdrs) { /************************************** * @@ -2603,7 +2633,7 @@ rem_port* port = (rem_port*) xdrs->x_public; const char* p = xdrs->x_base; - SSHORT length = xdrs->x_private - p; + USHORT length = xdrs->x_private - p; // Send data in manageable hunks. If a packet is partial, indicate // that with a negative length. A positive length marks the end. @@ -2613,14 +2643,14 @@ const SSHORT l = (SSHORT) MIN(length, INET_remote_buffer); length -= l; if (!packet_send(port, p, (SSHORT) (length ? -l : l))) - return FALSE; + return false; p += l; } xdrs->x_private = xdrs->x_base; xdrs->x_handy = INET_remote_buffer; - return TRUE; + return true; } Modified: firebird/trunk/src/remote/protocol.h =================================================================== --- firebird/trunk/src/remote/protocol.h 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/protocol.h 2014-11-25 13:24:10 UTC (rev 60291) @@ -105,6 +105,10 @@ const USHORT ptype_batch_send = 3; // Batch sends, no asynchrony const USHORT ptype_out_of_band = 4; // Batch sends w/ out of band notification const USHORT ptype_lazy_send = 5; // Deferred packets delivery +const USHORT ptype_MASK = 0xFF; // Mask - up to 255 types of protocol +// +// upper byte is used for protocol flags +const USHORT pflag_compress = 0x100; // Turn on compression if possible // Generic object id Modified: firebird/trunk/src/remote/remot_proto.h =================================================================== --- firebird/trunk/src/remote/remot_proto.h 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/remot_proto.h 2014-11-25 13:24:10 UTC (rev 60291) @@ -44,6 +44,9 @@ struct rem_port; struct rem_fmt; struct Rdb; +typedef bool PacketReceive(rem_port*, UCHAR*, SSHORT, SSHORT*); +typedef bool PacketSend(rem_port*, const SCHAR*, SSHORT); +typedef bool ProtoWrite(XDR*); void REMOTE_cleanup_transaction (struct Rtr *); USHORT REMOTE_compute_batch_size (rem_port*, USHORT, P_OP, const rem_fmt*); @@ -63,6 +66,8 @@ void REMOTE_parseList(Remote::ParsedList&, Firebird::PathName); void REMOTE_makeList(Firebird::PathName& list, const Remote::ParsedList& parsed); void REMOTE_check_response(Firebird::IStatus* warning, Rdb* rdb, PACKET* packet, bool checkKeys = false); +bool REMOTE_inflate(rem_port*, PacketReceive*, UCHAR*, SSHORT, SSHORT*); +bool REMOTE_deflate(XDR*, ProtoWrite*, PacketSend*, bool flash); extern signed char wcCompatible[3][3]; Modified: firebird/trunk/src/remote/remote.cpp =================================================================== --- firebird/trunk/src/remote/remote.cpp 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/remote.cpp 2014-11-25 13:24:10 UTC (rev 60291) @@ -994,7 +994,7 @@ addMutliPartConnectParameter(dataFromPlugin, user_id, CNCT_specific_data); // Client's wirecrypt requested level - user_id.insertInt(CNCT_client_crypt, config->getWireCrypt(WC_CLIENT)); + user_id.insertInt(CNCT_client_crypt, clntConfig->getWireCrypt(WC_CLIENT)); } void ClntAuthBlock::resetClnt(const Firebird::PathName* fileName, const CSTRING* listStr) @@ -1020,8 +1020,8 @@ dataFromPlugin.clear(); firstTime = true; - config = REMOTE_get_config(fileName, &dpbConfig); - pluginList = config->getPlugins(Firebird::IPluginManager::AuthClient); + clntConfig = REMOTE_get_config(fileName, &dpbConfig); + pluginList = clntConfig->getPlugins(Firebird::IPluginManager::AuthClient); Firebird::PathName final; if (serverPluginList.hasData()) @@ -1064,7 +1064,7 @@ Firebird::RefPtr<Config>* ClntAuthBlock::getConfig() { - return config.hasData() ? &config : NULL; + return clntConfig.hasData() ? &clntConfig : NULL; } void ClntAuthBlock::storeDataForPlugin(unsigned int length, const unsigned char* data) @@ -1391,6 +1391,191 @@ } +bool REMOTE_inflate(rem_port* port, PacketReceive* packet_receive, UCHAR* buffer, SSHORT buffer_length, SSHORT* length) +{ +#ifdef WIRE_COMPRESS_SUPPORT + if (!port->port_compressed) + return packet_receive(port, buffer, buffer_length, length); + + z_stream& strm = port->port_recv_stream; + strm.avail_out = buffer_length; + strm.next_out = buffer; + + for(;;) + { + if (strm.avail_in) + { +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Data to inflate %d port %p\n", strm.avail_in, port); +#if COMPRESS_DEBUG>1 + for (unsigned n = 0; n < strm.avail_in; ++n) fprintf(stderr, "%02x ", strm.next_in[n]); + fprintf(stderr, "\n"); +#endif +#endif + + if (inflate(&strm, Z_NO_FLUSH) != Z_OK) + { +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Inflate error\n"); +#endif + (void)inflateEnd(&strm); + port->port_flags &= ~PORT_z_data; + return false; + } +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Inflated data %d\n", buffer_length - strm.avail_out); +#if COMPRESS_DEBUG>1 + for (unsigned n = 0; n < buffer_length - strm.avail_out; ++n) fprintf(stderr, "%02x ", buffer[n]); + fprintf(stderr, "\n"); +#endif +#endif + if (strm.next_out != buffer) + break; + + if (port->port_flags & PORT_z_data) // Was called from select_multi() but nothing decompressed + { + port->port_flags &= ~PORT_z_data; + return false; + } + + if (strm.next_in != &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)]) + { + memmove(&port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)], strm.next_in, strm.avail_in); + strm.next_in = &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)]; + } + } + else + strm.next_in = &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)]; + + SSHORT l = (SSHORT) (port->port_buff_size - strm.avail_in); + if ((!packet_receive(port, strm.next_in, l, &l)) || (l <= 0)) // fixit - 2 ways to report errors in same routine + { + (void)inflateEnd(&strm); + port->port_flags &= ~PORT_z_data; + return false; + } + + strm.avail_in += l; + } + + *length = (SSHORT) (buffer_length - strm.avail_out); + if (strm.avail_in) // Z-buffer still has some data - probably can call inflate() once more on them + port->port_flags |= PORT_z_data; + else + port->port_flags &= ~PORT_z_data; + + return true; +#else + return packet_receive(port, buffer, buffer_length, length); +#endif +} + + +bool REMOTE_deflate(XDR* xdrs, ProtoWrite* proto_write, PacketSend* packet_send, bool flash) +{ +#ifdef WIRE_COMPRESS_SUPPORT + rem_port* port = (rem_port*) xdrs->x_public; + if (!port->port_compressed) + return proto_write(xdrs); + + z_stream& strm = port->port_send_stream; + strm.avail_in = xdrs->x_private - xdrs->x_base; + strm.next_in = (Bytef*)xdrs->x_base; + + if (!strm.next_out) + { + strm.avail_out = port->port_buff_size; + strm.next_out = (Bytef*)&port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)]; + } + + bool expectMoreOut = flash; + + while(strm.avail_in || expectMoreOut) + { +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Data to deflate %d port %p\n", strm.avail_in, port); +#if COMPRESS_DEBUG>1 + for (unsigned n = 0; n < strm.avail_in; ++n) fprintf(stderr, "%02x ", strm.next_in[n]); + fprintf(stderr, "\n"); +#endif +#endif + int ret = deflate(&strm, flash ? Z_SYNC_FLUSH : Z_NO_FLUSH); + if (ret == Z_BUF_ERROR) + ret = 0; + if (ret != 0) + { +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Deflate error %d\n", ret); +#endif + return false; + } + +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Deflated data %d\n", port->port_buff_size - strm.avail_out); +#if COMPRESS_DEBUG>1 + for (unsigned n = 0; n < port->port_buff_size - strm.avail_out; ++n) + fprintf(stderr, "%02x ", port->port_compressed[REM_SEND_OFFSET(port->port_buff_size) + n]); + fprintf(stderr, "\n"); +#endif +#endif + + expectMoreOut = !strm.avail_out; + if ((port->port_buff_size != strm.avail_out) && (flash || !strm.avail_out)) + { + if (!packet_send(port, (SCHAR*) &port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)], + (SSHORT) (port->port_buff_size - strm.avail_out))) + { + return false; + } + + strm.avail_out = port->port_buff_size; + strm.next_out = (Bytef*)&port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)]; + } + } + + xdrs->x_private = xdrs->x_base; + xdrs->x_handy = port->port_buff_size; + + return true; +#else + return proto_write(xdrs); +#endif +} + + +void rem_port::initCompression() +{ +#ifdef WIRE_COMPRESS_SUPPORT + if (port_protocol >= PROTOCOL_VERSION13 && !port_compressed) + { + port_send_stream.zalloc = Z_NULL; + port_send_stream.zfree = Z_NULL; + port_send_stream.opaque = Z_NULL; + int ret = deflateInit(&port_send_stream, Z_DEFAULT_COMPRESSION); + if (ret != Z_OK) + (Firebird::Arg::Gds(isc_random) << "compression stream init error").raise(); // add error code + port_send_stream.next_out = NULL; + + port_recv_stream.zalloc = Z_NULL; + port_recv_stream.zfree = Z_NULL; + port_recv_stream.opaque = Z_NULL; + port_recv_stream.avail_in = 0; + port_recv_stream.next_in = Z_NULL; + ret = inflateInit(&port_recv_stream); + if (ret != Z_OK) + (Firebird::Arg::Gds(isc_random) << "decompression stream init error").raise(); // add error code + + port_compressed.reset(FB_NEW(getPool()) UCHAR[port_buff_size * 2]); + memset(port_compressed, 0, port_buff_size * 2); + port_recv_stream.next_in = &port_compressed[REM_RECV_OFFSET(port_buff_size)]; +#ifdef COMPRESS_DEBUG + fprintf(stderr, "Completed init port %p\n", this); +#endif + } +#endif +} + + signed char wcCompatible[3][3] = { /* DISABLED ENABLED REQUIRED */ /* DISABLED */ {0, 0, -1}, Modified: firebird/trunk/src/remote/remote.h =================================================================== --- firebird/trunk/src/remote/remote.h 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/remote.h 2014-11-25 13:24:10 UTC (rev 60291) @@ -55,7 +55,18 @@ #endif #endif // !WIN_NT +#if defined(HAVE_ZLIB_H) && defined(HAVE_LIBZ) +#define WIRE_COMPRESS_SUPPORT 1 +#endif +#ifdef WIRE_COMPRESS_SUPPORT +#include <zlib.h> +//#define COMPRESS_DEBUG 1 +#endif // WIRE_COMPRESS_SUPPORT + +#define REM_SEND_OFFSET(bs) (0) +#define REM_RECV_OFFSET(bs) (bs) + // Uncomment this line if you need to trace module activity //#define REMOTE_DEBUG @@ -666,7 +677,7 @@ Firebird::UCharBuffer dataForPlugin, dataFromPlugin; Firebird::HalfStaticArray<InternalCryptKey*, 1> cryptKeys; // Wire crypt keys that came from plugin(s) last time Firebird::string dpbConfig; // Used to recreate config with new filename - Firebird::RefPtr<Config> config; // Used to get plugins list and pass to port + Firebird::RefPtr<Config> clntConfig; // Used to get plugins list and pass to port unsigned nextKey; // First key to be analyzed bool hasCryptKey; // DPB contains disk crypt key, may be passed only over encrypted wire @@ -811,6 +822,7 @@ const USHORT PORT_detached = 0x0100; // op_detach, op_drop_database or op_service_detach was processed const USHORT PORT_rdb_shutdown = 0x0200; // Database is shut down const USHORT PORT_connecting = 0x0400; // Aux connection waits for a channel to be activated by client +const USHORT PORT_z_data = 0x0800; // Zlib incoming buffer has data left after decompression // Port itself @@ -860,7 +872,7 @@ struct srvr* port_server; // server of port USHORT port_server_flags; // TRUE if server USHORT port_protocol; // protocol version number - USHORT port_buff_size; // port buffer size (approx) + USHORT port_buff_size; // port buffer size USHORT port_flags; // Misc flags SLONG port_connect_timeout; // Connection timeout value SLONG port_dummy_packet_interval; // keep alive dummy packet interval @@ -922,6 +934,11 @@ FB_UINT64 port_snd_bytes; FB_UINT64 port_rcv_bytes; +#ifdef WIRE_COMPRESS_SUPPORT + z_stream port_send_stream, port_recv_stream; + UCharArrayAutoPtr port_compressed; +#endif + public: rem_port(rem_port_t t, size_t rpt) : port_sync(FB_NEW(getPool()) Firebird::RefMutex()), @@ -931,7 +948,7 @@ port_send_partial(0), port_connect(0), port_request(0), port_select_multi(0), port_type(t), port_state(PENDING), port_clients(0), port_next(0), port_parent(0), port_async(0), port_async_receive(0), - port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(0), + port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(rpt / 2), port_flags(0), port_connect_timeout(0), port_dummy_packet_interval(0), port_dummy_timeout(0), port_handle(INVALID_SOCKET), port_channel(INVALID_SOCKET), port_context(0), port_events_thread(0), port_events_shutdown(0), @@ -964,10 +981,11 @@ #endif } -private: // this is refCounted object - ~rem_port(); +private: + ~rem_port(); // this is refCounted object - private dtor is OK public: + void initCompression(); void linkParent(rem_port* const parent); void unlinkParent(); const Firebird::RefPtr<Config>& getPortConfig() const; Modified: firebird/trunk/src/remote/server/server.cpp =================================================================== --- firebird/trunk/src/remote/server/server.cpp 2014-11-25 07:47:24 UTC (rev 60290) +++ firebird/trunk/src/remote/server/server.cpp 2014-11-25 13:24:10 UTC (rev 60291) @@ -501,6 +501,8 @@ } authPort->send(send); + if (send->p_acpt.p_acpt_type & pflag_compress) + authPort->initCompression(); memset(&send->p_auth_cont, 0, sizeof send->p_auth_cont); return false; @@ -719,6 +721,7 @@ } RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION); + fb_assert(!port->port_compressed); PACKET packet; packet.p_operation = op_event; @@ -1663,6 +1666,7 @@ P_ARCH architecture = arch_generic; USHORT version = 0; USHORT type = 0; + bool compress = false; bool accepted = false; USHORT weight = 0; const p_cnct::p_cnct_repeat* protocol = connect->p_cnct_versions; @@ -1682,7 +1686,8 @@ weight = protocol->p_cnct_weight; version = protocol->p_cnct_version; architecture = protocol->p_cnct_architecture; - type = MIN(protocol->p_cnct_max_type, ptype_lazy_send); + type = MIN(protocol->p_cnct_max_type & ptype_MASK, ptype_lazy_send); + compress = protocol->p_cnct_max_type & pflag_compress; } } @@ -1691,12 +1696,12 @@ send->p_acpd.p_acpt_version = port->port_protocol = version; send->p_acpd.p_acpt_architecture = architecture; - send->p_acpd.p_acpt_type = type; + send->p_acpd.p_acpt_type = type | (compress ? pflag_compress : 0); send->p_acpd.p_acpt_authenticated = 0; send->p_acpt.p_acpt_version = port->port_protocol = version; send->p_acpt.p_acpt_architecture = architecture; - send->p_acpt.p_acpt_type = type; + send->p_acpt.p_acpt_type = type | (compress ? pflag_compress : 0); // modify the version string to reflect the chosen protocol string buffer; @@ -1864,6 +1869,8 @@ send->p_operation = returnData ? op_accept_data : op_accept; port->send(send); + if (send->p_acpt.p_acpt_type & pflag_compress) + port->initCompression(); return true; } @@ -1889,6 +1896,8 @@ authPort->port_srv_auth_block->extractNewKeys(s); send->p_acpd.p_acpt_authenticated = 1; authPort->send(send); + if (send->p_acpt.p_acpt_type & pflag_compress) + authPort->initCompression(); } } @@ -2144,8 +2153,6 @@ authBlock->store(pb, isc_dpb_auth_block); - send->p_operation = op_accept; - for (pb->rewind(); !pb->isEof();) { switch (pb->getClumpTag()) @@ -5252,8 +5259,6 @@ * Connect to a Firebird service. * **************************************/ - sendL->p_operation = op_accept; - // Now insert additional clumplets into spb addClumplets(spb, spbParam, this); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |