| 
     
      
      
      From: <di...@us...> - 2014-10-11 09:02:18
      
     
   | 
Revision: 60100
          http://sourceforge.net/p/firebird/code/60100
Author:   dimitr
Date:     2014-10-11 09:02:09 +0000 (Sat, 11 Oct 2014)
Log Message:
-----------
Improved the batching (prefetch) logic. This is the second part of CORE-2530.
Modified Paths:
--------------
    firebird/trunk/src/remote/client/interface.cpp
    firebird/trunk/src/remote/inet.cpp
    firebird/trunk/src/remote/os/win32/wnet.cpp
    firebird/trunk/src/remote/os/win32/xnet.cpp
    firebird/trunk/src/remote/remote.cpp
    firebird/trunk/src/remote/remote.h
    firebird/trunk/src/remote/server/server.cpp
Modified: firebird/trunk/src/remote/client/interface.cpp
===================================================================
--- firebird/trunk/src/remote/client/interface.cpp	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/client/interface.cpp	2014-10-11 09:02:09 UTC (rev 60100)
@@ -2897,7 +2897,6 @@
 			{
 				sqldata->p_sqldata_messages =
 					REMOTE_compute_batch_size(port, 0, op_fetch_response, statement->rsr_select_format);
-				sqldata->p_sqldata_messages *= 4;
 
 				// Reorder data when the local buffer is half empty
 
@@ -4076,8 +4075,7 @@
 			// could dynamically adjust batching sizes based on fetch patterns
 
 			data->p_data_messages = REMOTE_compute_batch_size(port, 0, op_send, tail->rrq_format);
-			tail->rrq_reorder_level = 2 * data->p_data_messages;
-			data->p_data_messages *= 4;
+			tail->rrq_reorder_level = data->p_data_messages / 2;
 			tail->rrq_rows_pending += data->p_data_messages;
 
 #ifdef DEBUG
Modified: firebird/trunk/src/remote/inet.cpp
===================================================================
--- firebird/trunk/src/remote/inet.cpp	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/inet.cpp	2014-10-11 09:02:09 UTC (rev 60100)
@@ -2977,6 +2977,9 @@
 	} // end scope
 #endif
 
+	port->port_rcv_packets++;
+	port->port_rcv_bytes += n;
+
 	*length = n;
 
 	return true;
@@ -3146,6 +3149,9 @@
 	} // end scope
 #endif
 
+	port->port_snd_packets++;
+	port->port_snd_bytes += buffer_length;
+
 	return true;
 }
 
Modified: firebird/trunk/src/remote/os/win32/wnet.cpp
===================================================================
--- firebird/trunk/src/remote/os/win32/wnet.cpp	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/os/win32/wnet.cpp	2014-10-11 09:02:09 UTC (rev 60100)
@@ -1342,6 +1342,9 @@
 	packet_print("receive", buffer, n);
 #endif
 
+	port->port_rcv_packets++;
+	port->port_rcv_bytes += n;
+
 	*length = (SSHORT) n;
 
 	return true;
@@ -1400,6 +1403,9 @@
 	packet_print("send", reinterpret_cast<const UCHAR*>(buffer), buffer_length);
 #endif
 
+	port->port_snd_packets++;
+	port->port_snd_bytes += buffer_length;
+
 	return true;
 }
 
Modified: firebird/trunk/src/remote/os/win32/xnet.cpp
===================================================================
--- firebird/trunk/src/remote/os/win32/xnet.cpp	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/os/win32/xnet.cpp	2014-10-11 09:02:09 UTC (rev 60100)
@@ -1902,7 +1902,7 @@
  *	Read a buffer full of data.
  *
  **************************************/
-	rem_port* port = (rem_port*)xdrs->x_public;
+	rem_port* port = (rem_port*) xdrs->x_public;
 	const bool portServer = (port->port_flags & PORT_server);
 	XCC xcc = port->port_xcc;
 	XCH xch = xcc->xcc_recv_channel;
@@ -1936,8 +1936,13 @@
 		if (wait_result == WAIT_OBJECT_0)
 		{
 			// Client has written some data for us (server) to read
+
+			port->port_rcv_packets++;
+			port->port_rcv_bytes += xch->xch_length;
+
 			xdrs->x_handy = xch->xch_length;
 			xdrs->x_private = xdrs->x_base;
+
 			return TRUE;
 		}
 		if (wait_result == WAIT_TIMEOUT)
@@ -1981,15 +1986,19 @@
  *  filled and ready for reading.
  *
  **************************************/
-	rem_port* port = (rem_port*)xdrs->x_public;
+	rem_port* port = (rem_port*) xdrs->x_public;
 	XCC xcc = port->port_xcc;
 	XCH xch = xcc->xcc_send_channel;
 
 	xch->xch_length = xdrs->x_private - xdrs->x_base;
 	if (SetEvent(xcc->xcc_event_send_channel_filled))
 	{
+		port->port_snd_packets++;
+		port->port_snd_bytes += xch->xch_length;
+
 		xdrs->x_private = xdrs->x_base;
 		xdrs->x_handy = xch->xch_size;
+
 		return TRUE;
 	}
 
Modified: firebird/trunk/src/remote/remote.cpp
===================================================================
--- firebird/trunk/src/remote/remote.cpp	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/remote.cpp	2014-10-11 09:02:09 UTC (rev 60100)
@@ -204,34 +204,12 @@
  * The client calculates this number (n from the list above)
  * and sends it to the server.
  *
- * I asked why it is that the client doesn't just ask for a packet
- * full of records and let the server return however many fits in
- * a packet.  According to Sudesh, this is because of a bug in
- * Superserver which showed up in the WIN_NT 4.2.x kits.  So I
- * imagine once we up the protocol so that we can be sure we're not
- * talking to a 4.2 kit, then we can make this optimization.
- *           - Deej 2/28/97
- *
- * Note: A future optimization can look at setting the packet
- * size to optimize the transfer.
- *
- * Note: This calculation must use worst-case to determine the
- * packing.  Should the data record have VARCHAR data, it is
- * often possible to fit more than the packing specification
- * into each packet.  This is also a candidate for future
- * optimization.
- *
  * The data size is either the XDR data representation, or the
  * actual message size (rounded up) if this is a symmetric
  * architecture connection.
  *
  **************************************/
 
-	const USHORT MAX_PACKETS_PER_BATCH	= 4;	// packets    - picked by SWAG
-	const USHORT MIN_PACKETS_PER_BATCH	= 2;	// packets    - picked by SWAG
-	const USHORT DESIRED_ROWS_PER_BATCH	= 20;	// data rows  - picked by SWAG
-	const USHORT MIN_ROWS_PER_BATCH		= 10;	// data rows  - picked by SWAG
-
 	const USHORT op_overhead = (USHORT) xdr_protocol_overhead(op_code);
 
 #ifdef DEBUG
@@ -241,52 +219,22 @@
 			   format->fmt_length, op_overhead);
 #endif
 
-	ULONG row_size;
-	if (port->port_flags & PORT_symmetric)
-	{
-		// Same architecture connection
-		row_size = (ROUNDUP(format->fmt_length, 4) + op_overhead);
-	}
-	else
-	{
-		// Using XDR for data transfer
-		row_size = (ROUNDUP(format->fmt_net_length, 4) + op_overhead);
-	}
+	const ULONG row_size = op_overhead +
+		(port->port_flags & PORT_symmetric) ?
+			ROUNDUP(format->fmt_length, 4) : 	// Same architecture connection
+			ROUNDUP(format->fmt_net_length, 4);	// Using XDR for data transfer
 
-	USHORT num_packets = (USHORT) (((DESIRED_ROWS_PER_BATCH * row_size)	// data set
-							 + buffer_used	// used in 1st pkt
-							 + (port->port_buff_size - 1))	// to round up
-							/ port->port_buff_size);
-	if (num_packets > MAX_PACKETS_PER_BATCH)
-	{
-		num_packets = (USHORT) (((MIN_ROWS_PER_BATCH * row_size)	// data set
-								 + buffer_used	// used in 1st pkt
-								 + (port->port_buff_size - 1))	// to round up
-								/ port->port_buff_size);
-	}
-	num_packets = MAX(num_packets, MIN_PACKETS_PER_BATCH);
+	ULONG result = (port->port_protocol >= PROTOCOL_VERSION13) ?
+		MAX_ROWS_PER_BATCH : (MAX_PACKETS_PER_BATCH * port->port_buff_size - buffer_used) / row_size;
 
-	// Now that we've picked the number of packets in a batch,
-	// pack as many rows as we can into the set of packets
+	// Don't ask for more records than we can cache
 
-	ULONG result = (num_packets * port->port_buff_size - buffer_used) / row_size;
+	result = MIN(result, MAX_BATCH_CACHE_SIZE / format->fmt_length);
 
-	// Must always send some messages, even if message size is more
-	// than packet size.
+	// Must always send some messages, even if message is larger than packet
 
 	result = MAX(result, MIN_ROWS_PER_BATCH);
 
-#ifdef DEBUG
-	{
-		// CVC: I don't see the point in replacing this with fb_utils::readenv().
-		const char* p = getenv("DEBUG_BATCH_SIZE");
-		if (p)
-			result = atoi(p);
-		fprintf(stderr, "row_size = %lu num_packets = %d\n", row_size, num_packets);
-		fprintf(stderr, "result = %lu\n", result);
-	}
-#endif
-
 	fb_assert(result <= MAX_USHORT);
 	return static_cast<USHORT>(result);
 }
Modified: firebird/trunk/src/remote/remote.h
===================================================================
--- firebird/trunk/src/remote/remote.h	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/remote.h	2014-10-11 09:02:09 UTC (rev 60100)
@@ -80,7 +80,15 @@
 #include "fb_blk.h"
 #include "firebird/Interface.h"
 
+// Prefetch constants
 
+const ULONG MAX_PACKETS_PER_BATCH = 8;
+
+const ULONG MIN_ROWS_PER_BATCH = 10;
+const ULONG MAX_ROWS_PER_BATCH = 1000;
+
+const ULONG MAX_BATCH_CACHE_SIZE = 1024 * 1024; // 1 MB
+
 // fwd. decl.
 namespace Firebird {
 	class Exception;
@@ -910,6 +918,11 @@
 
 	UCharArrayAutoPtr	port_buffer;
 
+	FB_UINT64 port_snd_packets;
+	FB_UINT64 port_rcv_packets;
+	FB_UINT64 port_snd_bytes;
+	FB_UINT64 port_rcv_bytes;
+
 public:
 	rem_port(rem_port_t t, size_t rpt) :
 		port_sync(FB_NEW(getPool()) Firebird::RefMutex()),
@@ -941,7 +954,8 @@
 		port_required_encryption(true),		// safe default
 		port_known_server_keys(getPool()), port_crypt_plugin(NULL),
 		port_client_crypt_callback(NULL), port_server_crypt_callback(NULL),
-		port_buffer(FB_NEW(getPool()) UCHAR[rpt])
+		port_buffer(FB_NEW(getPool()) UCHAR[rpt]),
+		port_snd_packets(0), port_rcv_packets(0), port_snd_bytes(0), port_rcv_bytes(0)
 	{
 		addRef();
 		memset(&port_linger, 0, sizeof port_linger);
Modified: firebird/trunk/src/remote/server/server.cpp
===================================================================
--- firebird/trunk/src/remote/server/server.cpp	2014-10-11 01:20:07 UTC (rev 60099)
+++ firebird/trunk/src/remote/server/server.cpp	2014-10-11 09:02:09 UTC (rev 60100)
@@ -3282,11 +3282,6 @@
 	Rsr* statement;
 	getHandle(statement, sqldata->p_sqldata_statement);
 
-	const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0;
-
-	USHORT count = statement->rsr_flags.test(Rsr::NO_BATCH) ? 1 : sqldata->p_sqldata_messages;
-	USHORT count2 = statement->rsr_flags.test(Rsr::NO_BATCH) ? 0 : count;
-
 	// On first fetch, clear the end-of-stream flag & reset the message buffers
 
 	if (!statement->rsr_flags.test(Rsr::FETCHED))
@@ -3311,6 +3306,7 @@
 		}
 	}
 
+	const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0;
 
 	// If required, call setDelayedOutputFormat()
 
@@ -3329,9 +3325,10 @@
 		statement->rsr_delayed_format = false;
 	}
 
-
 	// Get ready to ship the data out
 
+	const USHORT max_records = statement->rsr_flags.test(Rsr::NO_BATCH) ? 1 : sqldata->p_sqldata_messages;
+
 	P_SQLDATA* response = &sendL->p_sqldata;
 	sendL->p_operation = op_fetch_response;
 	response->p_sqldata_statement = sqldata->p_sqldata_statement;
@@ -3341,17 +3338,18 @@
 
 	// Check to see if any messages are already sitting around
 
+	const FB_UINT64 org_packets = this->port_snd_packets;
+
+	USHORT count = 0;
 	bool rc = true;
 
-	while (true)
+	for (; count < max_records; count++)
 	{
-
 		// Have we exhausted the cache & reached cursor EOF?
 		if (statement->rsr_flags.test(Rsr::EOF_SET) && !statement->rsr_msgs_waiting)
 		{
 			statement->rsr_flags.clear(Rsr::EOF_SET);
 			rc = false;
-			count2 = 0;
 			break;
 		}
 
@@ -3378,15 +3376,12 @@
 			rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer) == IStatus::FB_OK;
 
 			statement->rsr_flags.set(Rsr::FETCHED);
+
 			if (status_vector.getStatus() & Firebird::IStatus::FB_HAS_ERRORS)
-			{
 				return this->send_response(sendL, 0, 0, &status_vector, false);
-			}
+
 			if (!rc)
-			{
-				count2 = 0;
 				break;
-			}
 
 			message->msg_address = message->msg_buffer;
 		}
@@ -3397,8 +3392,6 @@
 			statement->rsr_msgs_waiting--;
 		}
 
-		count--;
-
 		// There's a buffer waiting -- send it
 
 		if (!this->send_partial(sendL))
@@ -3406,9 +3399,11 @@
 
 		message->msg_address = NULL;
 
-		// If we've sent the requested amount, break out of loop
+		// If we've hit maximum prefetch size, break out of loop
 
-		if (count <= 0)
+		const USHORT packets = this->port_snd_packets - org_packets;
+
+		if (packets >= MAX_PACKETS_PER_BATCH && count >= MIN_ROWS_PER_BATCH)
 			break;
 	}
 
@@ -3438,7 +3433,9 @@
 	while (message->msg_address && message->msg_next != statement->rsr_buffer)
 		message = message->msg_next;
 
-	for (; count2; --count2)
+	USHORT prefetch_count = (rc && !statement->rsr_flags.test(Rsr::NO_BATCH)) ? count : 0;
+
+	for (; prefetch_count; --prefetch_count)
 	{
 		if (message->msg_address)
 		{
This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site.
 |