|
From: <di...@us...> - 2014-10-11 09:02:18
|
Revision: 60100
http://sourceforge.net/p/firebird/code/60100
Author: dimitr
Date: 2014-10-11 09:02:09 +0000 (Sat, 11 Oct 2014)
Log Message:
-----------
Improved the batching (prefetch) logic. This is the second part of CORE-2530.
Modified Paths:
--------------
firebird/trunk/src/remote/client/interface.cpp
firebird/trunk/src/remote/inet.cpp
firebird/trunk/src/remote/os/win32/wnet.cpp
firebird/trunk/src/remote/os/win32/xnet.cpp
firebird/trunk/src/remote/remote.cpp
firebird/trunk/src/remote/remote.h
firebird/trunk/src/remote/server/server.cpp
Modified: firebird/trunk/src/remote/client/interface.cpp
===================================================================
--- firebird/trunk/src/remote/client/interface.cpp 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/client/interface.cpp 2014-10-11 09:02:09 UTC (rev 60100)
@@ -2897,7 +2897,6 @@
{
sqldata->p_sqldata_messages =
REMOTE_compute_batch_size(port, 0, op_fetch_response, statement->rsr_select_format);
- sqldata->p_sqldata_messages *= 4;
// Reorder data when the local buffer is half empty
@@ -4076,8 +4075,7 @@
// could dynamically adjust batching sizes based on fetch patterns
data->p_data_messages = REMOTE_compute_batch_size(port, 0, op_send, tail->rrq_format);
- tail->rrq_reorder_level = 2 * data->p_data_messages;
- data->p_data_messages *= 4;
+ tail->rrq_reorder_level = data->p_data_messages / 2;
tail->rrq_rows_pending += data->p_data_messages;
#ifdef DEBUG
Modified: firebird/trunk/src/remote/inet.cpp
===================================================================
--- firebird/trunk/src/remote/inet.cpp 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/inet.cpp 2014-10-11 09:02:09 UTC (rev 60100)
@@ -2977,6 +2977,9 @@
} // end scope
#endif
+ port->port_rcv_packets++;
+ port->port_rcv_bytes += n;
+
*length = n;
return true;
@@ -3146,6 +3149,9 @@
} // end scope
#endif
+ port->port_snd_packets++;
+ port->port_snd_bytes += buffer_length;
+
return true;
}
Modified: firebird/trunk/src/remote/os/win32/wnet.cpp
===================================================================
--- firebird/trunk/src/remote/os/win32/wnet.cpp 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/os/win32/wnet.cpp 2014-10-11 09:02:09 UTC (rev 60100)
@@ -1342,6 +1342,9 @@
packet_print("receive", buffer, n);
#endif
+ port->port_rcv_packets++;
+ port->port_rcv_bytes += n;
+
*length = (SSHORT) n;
return true;
@@ -1400,6 +1403,9 @@
packet_print("send", reinterpret_cast<const UCHAR*>(buffer), buffer_length);
#endif
+ port->port_snd_packets++;
+ port->port_snd_bytes += buffer_length;
+
return true;
}
Modified: firebird/trunk/src/remote/os/win32/xnet.cpp
===================================================================
--- firebird/trunk/src/remote/os/win32/xnet.cpp 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/os/win32/xnet.cpp 2014-10-11 09:02:09 UTC (rev 60100)
@@ -1902,7 +1902,7 @@
* Read a buffer full of data.
*
**************************************/
- rem_port* port = (rem_port*)xdrs->x_public;
+ rem_port* port = (rem_port*) xdrs->x_public;
const bool portServer = (port->port_flags & PORT_server);
XCC xcc = port->port_xcc;
XCH xch = xcc->xcc_recv_channel;
@@ -1936,8 +1936,13 @@
if (wait_result == WAIT_OBJECT_0)
{
// Client has written some data for us (server) to read
+
+ port->port_rcv_packets++;
+ port->port_rcv_bytes += xch->xch_length;
+
xdrs->x_handy = xch->xch_length;
xdrs->x_private = xdrs->x_base;
+
return TRUE;
}
if (wait_result == WAIT_TIMEOUT)
@@ -1981,15 +1986,19 @@
* filled and ready for reading.
*
**************************************/
- rem_port* port = (rem_port*)xdrs->x_public;
+ rem_port* port = (rem_port*) xdrs->x_public;
XCC xcc = port->port_xcc;
XCH xch = xcc->xcc_send_channel;
xch->xch_length = xdrs->x_private - xdrs->x_base;
if (SetEvent(xcc->xcc_event_send_channel_filled))
{
+ port->port_snd_packets++;
+ port->port_snd_bytes += xch->xch_length;
+
xdrs->x_private = xdrs->x_base;
xdrs->x_handy = xch->xch_size;
+
return TRUE;
}
Modified: firebird/trunk/src/remote/remote.cpp
===================================================================
--- firebird/trunk/src/remote/remote.cpp 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/remote.cpp 2014-10-11 09:02:09 UTC (rev 60100)
@@ -204,34 +204,12 @@
* The client calculates this number (n from the list above)
* and sends it to the server.
*
- * I asked why it is that the client doesn't just ask for a packet
- * full of records and let the server return however many fits in
- * a packet. According to Sudesh, this is because of a bug in
- * Superserver which showed up in the WIN_NT 4.2.x kits. So I
- * imagine once we up the protocol so that we can be sure we're not
- * talking to a 4.2 kit, then we can make this optimization.
- * - Deej 2/28/97
- *
- * Note: A future optimization can look at setting the packet
- * size to optimize the transfer.
- *
- * Note: This calculation must use worst-case to determine the
- * packing. Should the data record have VARCHAR data, it is
- * often possible to fit more than the packing specification
- * into each packet. This is also a candidate for future
- * optimization.
- *
* The data size is either the XDR data representation, or the
* actual message size (rounded up) if this is a symmetric
* architecture connection.
*
**************************************/
- const USHORT MAX_PACKETS_PER_BATCH = 4; // packets - picked by SWAG
- const USHORT MIN_PACKETS_PER_BATCH = 2; // packets - picked by SWAG
- const USHORT DESIRED_ROWS_PER_BATCH = 20; // data rows - picked by SWAG
- const USHORT MIN_ROWS_PER_BATCH = 10; // data rows - picked by SWAG
-
const USHORT op_overhead = (USHORT) xdr_protocol_overhead(op_code);
#ifdef DEBUG
@@ -241,52 +219,22 @@
format->fmt_length, op_overhead);
#endif
- ULONG row_size;
- if (port->port_flags & PORT_symmetric)
- {
- // Same architecture connection
- row_size = (ROUNDUP(format->fmt_length, 4) + op_overhead);
- }
- else
- {
- // Using XDR for data transfer
- row_size = (ROUNDUP(format->fmt_net_length, 4) + op_overhead);
- }
+ const ULONG row_size = op_overhead +
+ (port->port_flags & PORT_symmetric) ?
+ ROUNDUP(format->fmt_length, 4) : // Same architecture connection
+ ROUNDUP(format->fmt_net_length, 4); // Using XDR for data transfer
- USHORT num_packets = (USHORT) (((DESIRED_ROWS_PER_BATCH * row_size) // data set
- + buffer_used // used in 1st pkt
- + (port->port_buff_size - 1)) // to round up
- / port->port_buff_size);
- if (num_packets > MAX_PACKETS_PER_BATCH)
- {
- num_packets = (USHORT) (((MIN_ROWS_PER_BATCH * row_size) // data set
- + buffer_used // used in 1st pkt
- + (port->port_buff_size - 1)) // to round up
- / port->port_buff_size);
- }
- num_packets = MAX(num_packets, MIN_PACKETS_PER_BATCH);
+ ULONG result = (port->port_protocol >= PROTOCOL_VERSION13) ?
+ MAX_ROWS_PER_BATCH : (MAX_PACKETS_PER_BATCH * port->port_buff_size - buffer_used) / row_size;
- // Now that we've picked the number of packets in a batch,
- // pack as many rows as we can into the set of packets
+ // Don't ask for more records than we can cache
- ULONG result = (num_packets * port->port_buff_size - buffer_used) / row_size;
+ result = MIN(result, MAX_BATCH_CACHE_SIZE / format->fmt_length);
- // Must always send some messages, even if message size is more
- // than packet size.
+ // Must always send some messages, even if message is larger than packet
result = MAX(result, MIN_ROWS_PER_BATCH);
-#ifdef DEBUG
- {
- // CVC: I don't see the point in replacing this with fb_utils::readenv().
- const char* p = getenv("DEBUG_BATCH_SIZE");
- if (p)
- result = atoi(p);
- fprintf(stderr, "row_size = %lu num_packets = %d\n", row_size, num_packets);
- fprintf(stderr, "result = %lu\n", result);
- }
-#endif
-
fb_assert(result <= MAX_USHORT);
return static_cast<USHORT>(result);
}
Modified: firebird/trunk/src/remote/remote.h
===================================================================
--- firebird/trunk/src/remote/remote.h 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/remote.h 2014-10-11 09:02:09 UTC (rev 60100)
@@ -80,7 +80,15 @@
#include "fb_blk.h"
#include "firebird/Interface.h"
+// Prefetch constants
+const ULONG MAX_PACKETS_PER_BATCH = 8;
+
+const ULONG MIN_ROWS_PER_BATCH = 10;
+const ULONG MAX_ROWS_PER_BATCH = 1000;
+
+const ULONG MAX_BATCH_CACHE_SIZE = 1024 * 1024; // 1 MB
+
// fwd. decl.
namespace Firebird {
class Exception;
@@ -910,6 +918,11 @@
UCharArrayAutoPtr port_buffer;
+ FB_UINT64 port_snd_packets;
+ FB_UINT64 port_rcv_packets;
+ FB_UINT64 port_snd_bytes;
+ FB_UINT64 port_rcv_bytes;
+
public:
rem_port(rem_port_t t, size_t rpt) :
port_sync(FB_NEW(getPool()) Firebird::RefMutex()),
@@ -941,7 +954,8 @@
port_required_encryption(true), // safe default
port_known_server_keys(getPool()), port_crypt_plugin(NULL),
port_client_crypt_callback(NULL), port_server_crypt_callback(NULL),
- port_buffer(FB_NEW(getPool()) UCHAR[rpt])
+ port_buffer(FB_NEW(getPool()) UCHAR[rpt]),
+ port_snd_packets(0), port_rcv_packets(0), port_snd_bytes(0), port_rcv_bytes(0)
{
addRef();
memset(&port_linger, 0, sizeof port_linger);
Modified: firebird/trunk/src/remote/server/server.cpp
===================================================================
--- firebird/trunk/src/remote/server/server.cpp 2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/server/server.cpp 2014-10-11 09:02:09 UTC (rev 60100)
@@ -3282,11 +3282,6 @@
Rsr* statement;
getHandle(statement, sqldata->p_sqldata_statement);
- const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0;
-
- USHORT count = statement->rsr_flags.test(Rsr::NO_BATCH) ? 1 : sqldata->p_sqldata_messages;
- USHORT count2 = statement->rsr_flags.test(Rsr::NO_BATCH) ? 0 : count;
-
// On first fetch, clear the end-of-stream flag & reset the message buffers
if (!statement->rsr_flags.test(Rsr::FETCHED))
@@ -3311,6 +3306,7 @@
}
}
+ const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0;
// If required, call setDelayedOutputFormat()
@@ -3329,9 +3325,10 @@
statement->rsr_delayed_format = false;
}
-
// Get ready to ship the data out
+ const USHORT max_records = statement->rsr_flags.test(Rsr::NO_BATCH) ? 1 : sqldata->p_sqldata_messages;
+
P_SQLDATA* response = &sendL->p_sqldata;
sendL->p_operation = op_fetch_response;
response->p_sqldata_statement = sqldata->p_sqldata_statement;
@@ -3341,17 +3338,18 @@
// Check to see if any messages are already sitting around
+ const FB_UINT64 org_packets = this->port_snd_packets;
+
+ USHORT count = 0;
bool rc = true;
- while (true)
+ for (; count < max_records; count++)
{
-
// Have we exhausted the cache & reached cursor EOF?
if (statement->rsr_flags.test(Rsr::EOF_SET) && !statement->rsr_msgs_waiting)
{
statement->rsr_flags.clear(Rsr::EOF_SET);
rc = false;
- count2 = 0;
break;
}
@@ -3378,15 +3376,12 @@
rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer) == IStatus::FB_OK;
statement->rsr_flags.set(Rsr::FETCHED);
+
if (status_vector.getStatus() & Firebird::IStatus::FB_HAS_ERRORS)
- {
return this->send_response(sendL, 0, 0, &status_vector, false);
- }
+
if (!rc)
- {
- count2 = 0;
break;
- }
message->msg_address = message->msg_buffer;
}
@@ -3397,8 +3392,6 @@
statement->rsr_msgs_waiting--;
}
- count--;
-
// There's a buffer waiting -- send it
if (!this->send_partial(sendL))
@@ -3406,9 +3399,11 @@
message->msg_address = NULL;
- // If we've sent the requested amount, break out of loop
+ // If we've hit maximum prefetch size, break out of loop
- if (count <= 0)
+ const USHORT packets = this->port_snd_packets - org_packets;
+
+ if (packets >= MAX_PACKETS_PER_BATCH && count >= MIN_ROWS_PER_BATCH)
break;
}
@@ -3438,7 +3433,9 @@
while (message->msg_address && message->msg_next != statement->rsr_buffer)
message = message->msg_next;
- for (; count2; --count2)
+ USHORT prefetch_count = (rc && !statement->rsr_flags.test(Rsr::NO_BATCH)) ? count : 0;
+
+ for (; prefetch_count; --prefetch_count)
{
if (message->msg_address)
{
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
|