|
From: <ale...@us...> - 2014-11-25 13:24:14
|
Revision: 60291
http://sourceforge.net/p/firebird/code/60291
Author: alexpeshkoff
Date: 2014-11-25 13:24:10 +0000 (Tue, 25 Nov 2014)
Log Message:
-----------
Implemented CORE-733: Compress Data over the Network
Modified Paths:
--------------
firebird/trunk/builds/install/misc/firebird.conf.in
firebird/trunk/configure.ac
firebird/trunk/src/common/config/config.cpp
firebird/trunk/src/common/config/config.h
firebird/trunk/src/remote/client/interface.cpp
firebird/trunk/src/remote/inet.cpp
firebird/trunk/src/remote/protocol.h
firebird/trunk/src/remote/remot_proto.h
firebird/trunk/src/remote/remote.cpp
firebird/trunk/src/remote/remote.h
firebird/trunk/src/remote/server/server.cpp
Modified: firebird/trunk/builds/install/misc/firebird.conf.in
===================================================================
--- firebird/trunk/builds/install/misc/firebird.conf.in 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/builds/install/misc/firebird.conf.in 2014-11-25 13:24:10 UTC (rev 60291)
@@ -565,6 +565,17 @@
#WireCrypt = Enabled (for client) / Required (for server)
#
+# Should connection over the wire be compressed?
+# Client only value - server should follow client setting if connect using
+# correct protocol (>=13).
+#
+# Per-connection configurable.
+#
+# Type: boolean
+#
+#WireCompression = false
+
+#
# Seconds to wait on a silent client connection before the server sends
# dummy packets to request acknowledgment.
#
Modified: firebird/trunk/configure.ac
===================================================================
--- firebird/trunk/configure.ac 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/configure.ac 2014-11-25 13:24:10 UTC (rev 60291)
@@ -723,6 +723,7 @@
AC_CHECK_HEADERS(iconv.h)
AC_CHECK_HEADERS(libio.h)
AC_CHECK_HEADERS(linux/falloc.h)
+AC_CHECK_HEADERS(zlib.h)
dnl check for ICU presence
AC_CHECK_HEADER(unicode/ucnv.h,,AC_MSG_ERROR(ICU support not found - please install development ICU package))
@@ -739,6 +740,7 @@
dnl Check for libraries
AC_SEARCH_LIBS(dlopen, dl)
+AC_CHECK_LIB(z, deflate)
AC_CHECK_LIB(m, main)
if test "$EDITLINE_FLG" = "Y"; then
AC_CHECK_LIB(curses, tgetent, TERMLIB=curses, \
Modified: firebird/trunk/src/common/config/config.cpp
===================================================================
--- firebird/trunk/src/common/config/config.cpp 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/common/config/config.cpp 2014-11-25 13:24:10 UTC (rev 60291)
@@ -198,7 +198,8 @@
{TYPE_STRING, "WireCryptPlugin", (ConfigValue) "Arc4"},
{TYPE_STRING, "KeyHolderPlugin", (ConfigValue) ""},
{TYPE_BOOLEAN, "RemoteAccess", (ConfigValue) true},
- {TYPE_BOOLEAN, "IPv6V6Only", (ConfigValue) false}
+ {TYPE_BOOLEAN, "IPv6V6Only", (ConfigValue) false},
+ {TYPE_BOOLEAN, "WireCompression", (ConfigValue) false}
};
/******************************************************************************
@@ -750,3 +751,8 @@
{
return get<bool>(KEY_REMOTE_ACCESS);
}
+
+bool Config::getWireCompression() const
+{
+ return get<bool>(KEY_WIRE_COMPRESSION);
+}
Modified: firebird/trunk/src/common/config/config.h
===================================================================
--- firebird/trunk/src/common/config/config.h 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/common/config/config.h 2014-11-25 13:24:10 UTC (rev 60291)
@@ -136,6 +136,7 @@
KEY_PLUG_KEY_HOLDER,
KEY_REMOTE_ACCESS,
KEY_IPV6_V6ONLY,
+ KEY_WIRE_COMPRESSION,
MAX_CONFIG_KEY // keep it last
};
@@ -333,6 +334,8 @@
int getWireCrypt(WireCryptMode wcMode) const;
bool getRemoteAccess() const;
+
+ bool getWireCompression() const;
};
// Implementation of interface to access master configuration file
Modified: firebird/trunk/src/remote/client/interface.cpp
===================================================================
--- firebird/trunk/src/remote/client/interface.cpp 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/client/interface.cpp 2014-11-25 13:24:10 UTC (rev 60291)
@@ -5952,6 +5952,7 @@
P_OP operation = op_void;
{ // scope
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
+ fb_assert(!port->port_compressed);
stuff = port->receive(&packet);
operation = packet.p_operation;
@@ -6235,6 +6236,9 @@
HANDSHAKE_DEBUG(fprintf(stderr, "Cli: authReceiveResponse: cond_accept d=%d n=%d '%.*s' 0x%x\n",
d->cstr_length, n->cstr_length,
n->cstr_length, n->cstr_address, n->cstr_address ? n->cstr_address[0] : 0));
+ if (packet->p_acpd.p_acpt_type & pflag_compress)
+ port->initCompression();
+ packet->p_acpd.p_acpt_type &= ptype_MASK;
break;
case op_crypt:
Modified: firebird/trunk/src/remote/inet.cpp
===================================================================
--- firebird/trunk/src/remote/inet.cpp 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/inet.cpp 2014-11-25 13:24:10 UTC (rev 60291)
@@ -218,9 +218,9 @@
}
#endif // DEBUG
-const SLONG MAX_DATA_LW = 1448; // Low Water mark
-const SLONG MAX_DATA_HW = 32768; // High Water mark
-const SLONG DEF_MAX_DATA = 8192;
+const ULONG MAX_DATA_LW = 1448; // Low Water mark
+const ULONG MAX_DATA_HW = 32768; // High Water mark
+const ULONG DEF_MAX_DATA = 8192;
//const int MAXHOSTLEN = 64;
@@ -281,6 +281,10 @@
HandleState ok(const rem_port* port)
{
+#ifdef WIRE_COMPRESS_SUPPORT
+ if (port->port_flags & PORT_z_data)
+ return SEL_READY;
+#endif
SOCKET n = port->port_handle;
#if defined(WIN_NT)
return FD_ISSET(n, &slct_fdset) ? SEL_READY : SEL_NO_DATA;
@@ -440,7 +444,7 @@
static bool_t inet_getbytes(XDR*, SCHAR *, u_int);
static void inet_error(bool, rem_port*, const TEXT*, ISC_STATUS, int);
static bool_t inet_putbytes(XDR*, const SCHAR*, u_int);
-static bool_t inet_read(XDR*);
+static bool inet_read(XDR*);
static rem_port* inet_try_connect( PACKET*,
Rdb*,
const PathName&,
@@ -448,7 +452,7 @@
ClumpletReader&,
RefPtr<Config>*,
const PathName*);
-static bool_t inet_write(XDR*); //, int);
+static bool inet_write(XDR*);
static void INET_server_socket(rem_port* port, USHORT flag, const addrinfo* pai);
#ifdef DEBUG
@@ -456,6 +460,7 @@
#endif
static bool packet_receive(rem_port*, UCHAR*, SSHORT, SSHORT*);
+static bool packet_receive2(rem_port*, UCHAR*, SSHORT, SSHORT*);
static bool packet_send(rem_port*, const SCHAR*, SSHORT);
static rem_port* receive(rem_port*, PACKET *);
static rem_port* select_accept(rem_port*);
@@ -506,7 +511,7 @@
-SLONG INET_remote_buffer;
+ULONG INET_remote_buffer;
static GlobalPtr<Mutex> init_mutex;
static volatile bool INET_initialized = false;
static volatile bool INET_shutting_down = false;
@@ -585,6 +590,10 @@
user_id.insertBytes(CNCT_group, reinterpret_cast<UCHAR*>(&eff_gid), sizeof(eff_gid));
}
+ // Should compression be tried?
+
+ bool compression = config && (*config)->getWireCompression();
+
// Establish connection to server
// If we want user verification, we can't speak anything less than version 7
@@ -605,10 +614,10 @@
for (size_t i = 0; i < cnct->p_cnct_count; i++) {
cnct->p_cnct_versions[i] = protocols_to_try[i];
+ if (compression && cnct->p_cnct_versions[i].p_cnct_version >= PROTOCOL_VERSION13)
+ cnct->p_cnct_versions[i].p_cnct_max_type |= pflag_compress;
}
- // Try connection using first set of protocols
-
rem_port* port = inet_try_connect(packet, rdb, file_name, node_name, dpb, config, ref_db_name);
P_ACPT* accept = NULL;
@@ -671,6 +680,9 @@
port->port_flags |= PORT_symmetric;
}
+ bool compress = accept->p_acpt_type & pflag_compress;
+ accept->p_acpt_type &= ptype_MASK;
+
if (accept->p_acpt_type != ptype_out_of_band) {
port->port_flags |= PORT_no_oob;
}
@@ -679,6 +691,9 @@
port->port_flags |= PORT_lazy;
}
+ if (compress)
+ port->initCompression();
+
return port;
}
@@ -1275,14 +1290,14 @@
port->port_request = aux_request;
port->port_buff_size = (USHORT) INET_remote_buffer;
port->port_async_receive = inet_async_receive;
- port->port_flags = flags;
+ port->port_flags |= flags;
- xdrinet_create( &port->port_send, port,
- &port->port_buffer[INET_remote_buffer],
- (USHORT) INET_remote_buffer,
- XDR_ENCODE);
+ xdrinet_create(&port->port_send, port,
+ &port->port_buffer[REM_SEND_OFFSET(INET_remote_buffer)],
+ (USHORT) INET_remote_buffer, XDR_ENCODE);
- xdrinet_create( &port->port_receive, port, port->port_buffer, 0, XDR_DECODE);
+ xdrinet_create(&port->port_receive, port,
+ &port->port_buffer[REM_RECV_OFFSET(INET_remote_buffer)], 0, XDR_DECODE);
if (parent && !(parent->port_server_flags & SRVR_thread_per_port))
{
@@ -1894,7 +1909,7 @@
}
else if (port = select_accept(main_port))
{
- if (!packet_receive(port, buffer, bufsize, length))
+ if (!REMOTE_inflate(port, packet_receive, buffer, bufsize, length))
{
*length = 0;
}
@@ -1914,7 +1929,7 @@
return true;
}
- if (!packet_receive(port, buffer, bufsize, length))
+ if (!REMOTE_inflate(port, packet_receive, buffer, bufsize, length))
{
if (port->port_flags & PORT_disconnect) {
continue;
@@ -2191,7 +2206,7 @@
port->port_send.x_client = !(port->port_flags & PORT_server);
#endif
if (!xdr_protocol(&port->port_send, packet))
- return FALSE;
+ return false;
#ifdef DEBUG
{ // scope
@@ -2206,7 +2221,7 @@
} // end scope
#endif
- return inet_write(&port->port_send /*, TRUE*/);
+ return REMOTE_deflate(&port->port_send, inet_write, packet_send, true);
}
static int send_partial( rem_port* port, PACKET * packet)
@@ -2262,7 +2277,7 @@
xdrs->x_ops = (xdr_t::xdr_ops*) &inet_ops;
xdrs->x_op = x_op;
- return TRUE;
+ return true;
}
#ifdef HAVE_SETITIMER
@@ -2447,8 +2462,10 @@
xdrs->x_handy = 0;
}
- if (!inet_write(xdrs /*, 0*/))
+ if (!REMOTE_deflate(xdrs, inet_write, packet_send, false))
+ {
return FALSE;
+ }
}
// Scalar values and bulk transfer remainder fall thru
@@ -2468,7 +2485,7 @@
while (--bytecount >= 0)
{
- if (xdrs->x_handy <= 0 && !inet_write(xdrs /*, 0*/))
+ if (xdrs->x_handy <= 0 && !REMOTE_deflate(xdrs, inet_write, packet_send, false))
return FALSE;
--xdrs->x_handy;
*xdrs->x_private++ = *buff++;
@@ -2477,7 +2494,7 @@
return TRUE;
}
-static bool_t inet_read( XDR* xdrs)
+static bool inet_read( XDR* xdrs)
{
/**************************************
*
@@ -2504,27 +2521,40 @@
p += xdrs->x_handy;
}
+ SSHORT length = end - p;
+ port->port_flags &= ~PORT_z_data;
+ if (!REMOTE_inflate(port, packet_receive2, (UCHAR*)p, length, &length))
+ return false;
+ p += length;
+
+ xdrs->x_handy = (int) ((SCHAR *) p - xdrs->x_base);
+ xdrs->x_private = xdrs->x_base;
+
+ return true;
+}
+
+static bool packet_receive2(rem_port* port, UCHAR* p, SSHORT bufSize, SSHORT* length)
+{
+ *length = 0;
+
while (true)
{
- SSHORT length = end - p;
- if (!packet_receive(port, reinterpret_cast<UCHAR*>(p), length, &length))
+ SSHORT l = bufSize - *length;
+ if (!packet_receive(port, p + *length, l, &l))
+ return false;
+
+ if (l >= 0)
{
- return FALSE;
- }
- if (length >= 0)
- {
- p += length;
+ *length += l;
break;
}
- p -= length;
+
+ *length -= l;
if (!packet_send(port, 0, 0))
- return FALSE;
+ return false;
}
- xdrs->x_handy = (int) ((SCHAR *) p - xdrs->x_base);
- xdrs->x_private = xdrs->x_base;
-
- return TRUE;
+ return true;
}
static rem_port* inet_try_connect(PACKET* packet,
@@ -2565,7 +2595,7 @@
rem_port* port = NULL;
try
{
- port = INET_connect(node_name, packet, FALSE, &dpb, config);
+ port = INET_connect(node_name, packet, false, &dpb, config);
}
catch (const Exception&)
{
@@ -2587,7 +2617,7 @@
return port;
}
-static bool_t inet_write(XDR* xdrs)
+static bool inet_write(XDR* xdrs)
{
/**************************************
*
@@ -2603,7 +2633,7 @@
rem_port* port = (rem_port*) xdrs->x_public;
const char* p = xdrs->x_base;
- SSHORT length = xdrs->x_private - p;
+ USHORT length = xdrs->x_private - p;
// Send data in manageable hunks. If a packet is partial, indicate
// that with a negative length. A positive length marks the end.
@@ -2613,14 +2643,14 @@
const SSHORT l = (SSHORT) MIN(length, INET_remote_buffer);
length -= l;
if (!packet_send(port, p, (SSHORT) (length ? -l : l)))
- return FALSE;
+ return false;
p += l;
}
xdrs->x_private = xdrs->x_base;
xdrs->x_handy = INET_remote_buffer;
- return TRUE;
+ return true;
}
Modified: firebird/trunk/src/remote/protocol.h
===================================================================
--- firebird/trunk/src/remote/protocol.h 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/protocol.h 2014-11-25 13:24:10 UTC (rev 60291)
@@ -105,6 +105,10 @@
const USHORT ptype_batch_send = 3; // Batch sends, no asynchrony
const USHORT ptype_out_of_band = 4; // Batch sends w/ out of band notification
const USHORT ptype_lazy_send = 5; // Deferred packets delivery
+const USHORT ptype_MASK = 0xFF; // Mask - up to 255 types of protocol
+//
+// upper byte is used for protocol flags
+const USHORT pflag_compress = 0x100; // Turn on compression if possible
// Generic object id
Modified: firebird/trunk/src/remote/remot_proto.h
===================================================================
--- firebird/trunk/src/remote/remot_proto.h 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/remot_proto.h 2014-11-25 13:24:10 UTC (rev 60291)
@@ -44,6 +44,9 @@
struct rem_port;
struct rem_fmt;
struct Rdb;
+typedef bool PacketReceive(rem_port*, UCHAR*, SSHORT, SSHORT*);
+typedef bool PacketSend(rem_port*, const SCHAR*, SSHORT);
+typedef bool ProtoWrite(XDR*);
void REMOTE_cleanup_transaction (struct Rtr *);
USHORT REMOTE_compute_batch_size (rem_port*, USHORT, P_OP, const rem_fmt*);
@@ -63,6 +66,8 @@
void REMOTE_parseList(Remote::ParsedList&, Firebird::PathName);
void REMOTE_makeList(Firebird::PathName& list, const Remote::ParsedList& parsed);
void REMOTE_check_response(Firebird::IStatus* warning, Rdb* rdb, PACKET* packet, bool checkKeys = false);
+bool REMOTE_inflate(rem_port*, PacketReceive*, UCHAR*, SSHORT, SSHORT*);
+bool REMOTE_deflate(XDR*, ProtoWrite*, PacketSend*, bool flash);
extern signed char wcCompatible[3][3];
Modified: firebird/trunk/src/remote/remote.cpp
===================================================================
--- firebird/trunk/src/remote/remote.cpp 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/remote.cpp 2014-11-25 13:24:10 UTC (rev 60291)
@@ -994,7 +994,7 @@
addMutliPartConnectParameter(dataFromPlugin, user_id, CNCT_specific_data);
// Client's wirecrypt requested level
- user_id.insertInt(CNCT_client_crypt, config->getWireCrypt(WC_CLIENT));
+ user_id.insertInt(CNCT_client_crypt, clntConfig->getWireCrypt(WC_CLIENT));
}
void ClntAuthBlock::resetClnt(const Firebird::PathName* fileName, const CSTRING* listStr)
@@ -1020,8 +1020,8 @@
dataFromPlugin.clear();
firstTime = true;
- config = REMOTE_get_config(fileName, &dpbConfig);
- pluginList = config->getPlugins(Firebird::IPluginManager::AuthClient);
+ clntConfig = REMOTE_get_config(fileName, &dpbConfig);
+ pluginList = clntConfig->getPlugins(Firebird::IPluginManager::AuthClient);
Firebird::PathName final;
if (serverPluginList.hasData())
@@ -1064,7 +1064,7 @@
Firebird::RefPtr<Config>* ClntAuthBlock::getConfig()
{
- return config.hasData() ? &config : NULL;
+ return clntConfig.hasData() ? &clntConfig : NULL;
}
void ClntAuthBlock::storeDataForPlugin(unsigned int length, const unsigned char* data)
@@ -1391,6 +1391,191 @@
}
+bool REMOTE_inflate(rem_port* port, PacketReceive* packet_receive, UCHAR* buffer, SSHORT buffer_length, SSHORT* length)
+{
+#ifdef WIRE_COMPRESS_SUPPORT
+ if (!port->port_compressed)
+ return packet_receive(port, buffer, buffer_length, length);
+
+ z_stream& strm = port->port_recv_stream;
+ strm.avail_out = buffer_length;
+ strm.next_out = buffer;
+
+ for(;;)
+ {
+ if (strm.avail_in)
+ {
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Data to inflate %d port %p\n", strm.avail_in, port);
+#if COMPRESS_DEBUG>1
+ for (unsigned n = 0; n < strm.avail_in; ++n) fprintf(stderr, "%02x ", strm.next_in[n]);
+ fprintf(stderr, "\n");
+#endif
+#endif
+
+ if (inflate(&strm, Z_NO_FLUSH) != Z_OK)
+ {
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Inflate error\n");
+#endif
+ (void)inflateEnd(&strm);
+ port->port_flags &= ~PORT_z_data;
+ return false;
+ }
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Inflated data %d\n", buffer_length - strm.avail_out);
+#if COMPRESS_DEBUG>1
+ for (unsigned n = 0; n < buffer_length - strm.avail_out; ++n) fprintf(stderr, "%02x ", buffer[n]);
+ fprintf(stderr, "\n");
+#endif
+#endif
+ if (strm.next_out != buffer)
+ break;
+
+ if (port->port_flags & PORT_z_data) // Was called from select_multi() but nothing decompressed
+ {
+ port->port_flags &= ~PORT_z_data;
+ return false;
+ }
+
+ if (strm.next_in != &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)])
+ {
+ memmove(&port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)], strm.next_in, strm.avail_in);
+ strm.next_in = &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)];
+ }
+ }
+ else
+ strm.next_in = &port->port_compressed[REM_RECV_OFFSET(port->port_buff_size)];
+
+ SSHORT l = (SSHORT) (port->port_buff_size - strm.avail_in);
+ if ((!packet_receive(port, strm.next_in, l, &l)) || (l <= 0)) // fixit - 2 ways to report errors in same routine
+ {
+ (void)inflateEnd(&strm);
+ port->port_flags &= ~PORT_z_data;
+ return false;
+ }
+
+ strm.avail_in += l;
+ }
+
+ *length = (SSHORT) (buffer_length - strm.avail_out);
+ if (strm.avail_in) // Z-buffer still has some data - probably can call inflate() once more on them
+ port->port_flags |= PORT_z_data;
+ else
+ port->port_flags &= ~PORT_z_data;
+
+ return true;
+#else
+ return packet_receive(port, buffer, buffer_length, length);
+#endif
+}
+
+
+bool REMOTE_deflate(XDR* xdrs, ProtoWrite* proto_write, PacketSend* packet_send, bool flash)
+{
+#ifdef WIRE_COMPRESS_SUPPORT
+ rem_port* port = (rem_port*) xdrs->x_public;
+ if (!port->port_compressed)
+ return proto_write(xdrs);
+
+ z_stream& strm = port->port_send_stream;
+ strm.avail_in = xdrs->x_private - xdrs->x_base;
+ strm.next_in = (Bytef*)xdrs->x_base;
+
+ if (!strm.next_out)
+ {
+ strm.avail_out = port->port_buff_size;
+ strm.next_out = (Bytef*)&port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)];
+ }
+
+ bool expectMoreOut = flash;
+
+ while(strm.avail_in || expectMoreOut)
+ {
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Data to deflate %d port %p\n", strm.avail_in, port);
+#if COMPRESS_DEBUG>1
+ for (unsigned n = 0; n < strm.avail_in; ++n) fprintf(stderr, "%02x ", strm.next_in[n]);
+ fprintf(stderr, "\n");
+#endif
+#endif
+ int ret = deflate(&strm, flash ? Z_SYNC_FLUSH : Z_NO_FLUSH);
+ if (ret == Z_BUF_ERROR)
+ ret = 0;
+ if (ret != 0)
+ {
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Deflate error %d\n", ret);
+#endif
+ return false;
+ }
+
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Deflated data %d\n", port->port_buff_size - strm.avail_out);
+#if COMPRESS_DEBUG>1
+ for (unsigned n = 0; n < port->port_buff_size - strm.avail_out; ++n)
+ fprintf(stderr, "%02x ", port->port_compressed[REM_SEND_OFFSET(port->port_buff_size) + n]);
+ fprintf(stderr, "\n");
+#endif
+#endif
+
+ expectMoreOut = !strm.avail_out;
+ if ((port->port_buff_size != strm.avail_out) && (flash || !strm.avail_out))
+ {
+ if (!packet_send(port, (SCHAR*) &port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)],
+ (SSHORT) (port->port_buff_size - strm.avail_out)))
+ {
+ return false;
+ }
+
+ strm.avail_out = port->port_buff_size;
+ strm.next_out = (Bytef*)&port->port_compressed[REM_SEND_OFFSET(port->port_buff_size)];
+ }
+ }
+
+ xdrs->x_private = xdrs->x_base;
+ xdrs->x_handy = port->port_buff_size;
+
+ return true;
+#else
+ return proto_write(xdrs);
+#endif
+}
+
+
+void rem_port::initCompression()
+{
+#ifdef WIRE_COMPRESS_SUPPORT
+ if (port_protocol >= PROTOCOL_VERSION13 && !port_compressed)
+ {
+ port_send_stream.zalloc = Z_NULL;
+ port_send_stream.zfree = Z_NULL;
+ port_send_stream.opaque = Z_NULL;
+ int ret = deflateInit(&port_send_stream, Z_DEFAULT_COMPRESSION);
+ if (ret != Z_OK)
+ (Firebird::Arg::Gds(isc_random) << "compression stream init error").raise(); // add error code
+ port_send_stream.next_out = NULL;
+
+ port_recv_stream.zalloc = Z_NULL;
+ port_recv_stream.zfree = Z_NULL;
+ port_recv_stream.opaque = Z_NULL;
+ port_recv_stream.avail_in = 0;
+ port_recv_stream.next_in = Z_NULL;
+ ret = inflateInit(&port_recv_stream);
+ if (ret != Z_OK)
+ (Firebird::Arg::Gds(isc_random) << "decompression stream init error").raise(); // add error code
+
+ port_compressed.reset(FB_NEW(getPool()) UCHAR[port_buff_size * 2]);
+ memset(port_compressed, 0, port_buff_size * 2);
+ port_recv_stream.next_in = &port_compressed[REM_RECV_OFFSET(port_buff_size)];
+#ifdef COMPRESS_DEBUG
+ fprintf(stderr, "Completed init port %p\n", this);
+#endif
+ }
+#endif
+}
+
+
signed char wcCompatible[3][3] = {
/* DISABLED ENABLED REQUIRED */
/* DISABLED */ {0, 0, -1},
Modified: firebird/trunk/src/remote/remote.h
===================================================================
--- firebird/trunk/src/remote/remote.h 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/remote.h 2014-11-25 13:24:10 UTC (rev 60291)
@@ -55,7 +55,18 @@
#endif
#endif // !WIN_NT
+#if defined(HAVE_ZLIB_H) && defined(HAVE_LIBZ)
+#define WIRE_COMPRESS_SUPPORT 1
+#endif
+#ifdef WIRE_COMPRESS_SUPPORT
+#include <zlib.h>
+//#define COMPRESS_DEBUG 1
+#endif // WIRE_COMPRESS_SUPPORT
+
+#define REM_SEND_OFFSET(bs) (0)
+#define REM_RECV_OFFSET(bs) (bs)
+
// Uncomment this line if you need to trace module activity
//#define REMOTE_DEBUG
@@ -666,7 +677,7 @@
Firebird::UCharBuffer dataForPlugin, dataFromPlugin;
Firebird::HalfStaticArray<InternalCryptKey*, 1> cryptKeys; // Wire crypt keys that came from plugin(s) last time
Firebird::string dpbConfig; // Used to recreate config with new filename
- Firebird::RefPtr<Config> config; // Used to get plugins list and pass to port
+ Firebird::RefPtr<Config> clntConfig; // Used to get plugins list and pass to port
unsigned nextKey; // First key to be analyzed
bool hasCryptKey; // DPB contains disk crypt key, may be passed only over encrypted wire
@@ -811,6 +822,7 @@
const USHORT PORT_detached = 0x0100; // op_detach, op_drop_database or op_service_detach was processed
const USHORT PORT_rdb_shutdown = 0x0200; // Database is shut down
const USHORT PORT_connecting = 0x0400; // Aux connection waits for a channel to be activated by client
+const USHORT PORT_z_data = 0x0800; // Zlib incoming buffer has data left after decompression
// Port itself
@@ -860,7 +872,7 @@
struct srvr* port_server; // server of port
USHORT port_server_flags; // TRUE if server
USHORT port_protocol; // protocol version number
- USHORT port_buff_size; // port buffer size (approx)
+ USHORT port_buff_size; // port buffer size
USHORT port_flags; // Misc flags
SLONG port_connect_timeout; // Connection timeout value
SLONG port_dummy_packet_interval; // keep alive dummy packet interval
@@ -922,6 +934,11 @@
FB_UINT64 port_snd_bytes;
FB_UINT64 port_rcv_bytes;
+#ifdef WIRE_COMPRESS_SUPPORT
+ z_stream port_send_stream, port_recv_stream;
+ UCharArrayAutoPtr port_compressed;
+#endif
+
public:
rem_port(rem_port_t t, size_t rpt) :
port_sync(FB_NEW(getPool()) Firebird::RefMutex()),
@@ -931,7 +948,7 @@
port_send_partial(0), port_connect(0), port_request(0), port_select_multi(0),
port_type(t), port_state(PENDING), port_clients(0), port_next(0),
port_parent(0), port_async(0), port_async_receive(0),
- port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(0),
+ port_server(0), port_server_flags(0), port_protocol(0), port_buff_size(rpt / 2),
port_flags(0), port_connect_timeout(0), port_dummy_packet_interval(0),
port_dummy_timeout(0), port_handle(INVALID_SOCKET), port_channel(INVALID_SOCKET), port_context(0),
port_events_thread(0), port_events_shutdown(0),
@@ -964,10 +981,11 @@
#endif
}
-private: // this is refCounted object
- ~rem_port();
+private:
+ ~rem_port(); // this is refCounted object - private dtor is OK
public:
+ void initCompression();
void linkParent(rem_port* const parent);
void unlinkParent();
const Firebird::RefPtr<Config>& getPortConfig() const;
Modified: firebird/trunk/src/remote/server/server.cpp
===================================================================
--- firebird/trunk/src/remote/server/server.cpp 2014-11-25 07:47:24 UTC (rev 60290)
+++ firebird/trunk/src/remote/server/server.cpp 2014-11-25 13:24:10 UTC (rev 60291)
@@ -501,6 +501,8 @@
}
authPort->send(send);
+ if (send->p_acpt.p_acpt_type & pflag_compress)
+ authPort->initCompression();
memset(&send->p_auth_cont, 0, sizeof send->p_auth_cont);
return false;
@@ -719,6 +721,7 @@
}
RefMutexGuard portGuard(*port->port_sync, FB_FUNCTION);
+ fb_assert(!port->port_compressed);
PACKET packet;
packet.p_operation = op_event;
@@ -1663,6 +1666,7 @@
P_ARCH architecture = arch_generic;
USHORT version = 0;
USHORT type = 0;
+ bool compress = false;
bool accepted = false;
USHORT weight = 0;
const p_cnct::p_cnct_repeat* protocol = connect->p_cnct_versions;
@@ -1682,7 +1686,8 @@
weight = protocol->p_cnct_weight;
version = protocol->p_cnct_version;
architecture = protocol->p_cnct_architecture;
- type = MIN(protocol->p_cnct_max_type, ptype_lazy_send);
+ type = MIN(protocol->p_cnct_max_type & ptype_MASK, ptype_lazy_send);
+ compress = protocol->p_cnct_max_type & pflag_compress;
}
}
@@ -1691,12 +1696,12 @@
send->p_acpd.p_acpt_version = port->port_protocol = version;
send->p_acpd.p_acpt_architecture = architecture;
- send->p_acpd.p_acpt_type = type;
+ send->p_acpd.p_acpt_type = type | (compress ? pflag_compress : 0);
send->p_acpd.p_acpt_authenticated = 0;
send->p_acpt.p_acpt_version = port->port_protocol = version;
send->p_acpt.p_acpt_architecture = architecture;
- send->p_acpt.p_acpt_type = type;
+ send->p_acpt.p_acpt_type = type | (compress ? pflag_compress : 0);
// modify the version string to reflect the chosen protocol
string buffer;
@@ -1864,6 +1869,8 @@
send->p_operation = returnData ? op_accept_data : op_accept;
port->send(send);
+ if (send->p_acpt.p_acpt_type & pflag_compress)
+ port->initCompression();
return true;
}
@@ -1889,6 +1896,8 @@
authPort->port_srv_auth_block->extractNewKeys(s);
send->p_acpd.p_acpt_authenticated = 1;
authPort->send(send);
+ if (send->p_acpt.p_acpt_type & pflag_compress)
+ authPort->initCompression();
}
}
@@ -2144,8 +2153,6 @@
authBlock->store(pb, isc_dpb_auth_block);
- send->p_operation = op_accept;
-
for (pb->rewind(); !pb->isEof();)
{
switch (pb->getClumpTag())
@@ -5252,8 +5259,6 @@
* Connect to a Firebird service.
*
**************************************/
- sendL->p_operation = op_accept;
-
// Now insert additional clumplets into spb
addClumplets(spb, spbParam, this);
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|