From: <di...@us...> - 2014-10-11 09:02:18
|
Revision: 60100 http://sourceforge.net/p/firebird/code/60100 Author: dimitr Date: 2014-10-11 09:02:09 +0000 (Sat, 11 Oct 2014) Log Message: ----------- Improved the batching (prefetch) logic. This is the second part of CORE-2530. Modified Paths: -------------- firebird/trunk/src/remote/client/interface.cpp firebird/trunk/src/remote/inet.cpp firebird/trunk/src/remote/os/win32/wnet.cpp firebird/trunk/src/remote/os/win32/xnet.cpp firebird/trunk/src/remote/remote.cpp firebird/trunk/src/remote/remote.h firebird/trunk/src/remote/server/server.cpp Modified: firebird/trunk/src/remote/client/interface.cpp =================================================================== --- firebird/trunk/src/remote/client/interface.cpp 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/client/interface.cpp 2014-10-11 09:02:09 UTC (rev 60100) @@ -2897,7 +2897,6 @@ { sqldata->p_sqldata_messages = REMOTE_compute_batch_size(port, 0, op_fetch_response, statement->rsr_select_format); - sqldata->p_sqldata_messages *= 4; // Reorder data when the local buffer is half empty @@ -4076,8 +4075,7 @@ // could dynamically adjust batching sizes based on fetch patterns data->p_data_messages = REMOTE_compute_batch_size(port, 0, op_send, tail->rrq_format); - tail->rrq_reorder_level = 2 * data->p_data_messages; - data->p_data_messages *= 4; + tail->rrq_reorder_level = data->p_data_messages / 2; tail->rrq_rows_pending += data->p_data_messages; #ifdef DEBUG Modified: firebird/trunk/src/remote/inet.cpp =================================================================== --- firebird/trunk/src/remote/inet.cpp 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/inet.cpp 2014-10-11 09:02:09 UTC (rev 60100) @@ -2977,6 +2977,9 @@ } // end scope #endif + port->port_rcv_packets++; + port->port_rcv_bytes += n; + *length = n; return true; @@ -3146,6 +3149,9 @@ } // end scope #endif + port->port_snd_packets++; + port->port_snd_bytes += buffer_length; + return true; } Modified: firebird/trunk/src/remote/os/win32/wnet.cpp =================================================================== --- firebird/trunk/src/remote/os/win32/wnet.cpp 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/os/win32/wnet.cpp 2014-10-11 09:02:09 UTC (rev 60100) @@ -1342,6 +1342,9 @@ packet_print("receive", buffer, n); #endif + port->port_rcv_packets++; + port->port_rcv_bytes += n; + *length = (SSHORT) n; return true; @@ -1400,6 +1403,9 @@ packet_print("send", reinterpret_cast<const UCHAR*>(buffer), buffer_length); #endif + port->port_snd_packets++; + port->port_snd_bytes += buffer_length; + return true; } Modified: firebird/trunk/src/remote/os/win32/xnet.cpp =================================================================== --- firebird/trunk/src/remote/os/win32/xnet.cpp 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/os/win32/xnet.cpp 2014-10-11 09:02:09 UTC (rev 60100) @@ -1902,7 +1902,7 @@ * Read a buffer full of data. * **************************************/ - rem_port* port = (rem_port*)xdrs->x_public; + rem_port* port = (rem_port*) xdrs->x_public; const bool portServer = (port->port_flags & PORT_server); XCC xcc = port->port_xcc; XCH xch = xcc->xcc_recv_channel; @@ -1936,8 +1936,13 @@ if (wait_result == WAIT_OBJECT_0) { // Client has written some data for us (server) to read + + port->port_rcv_packets++; + port->port_rcv_bytes += xch->xch_length; + xdrs->x_handy = xch->xch_length; xdrs->x_private = xdrs->x_base; + return TRUE; } if (wait_result == WAIT_TIMEOUT) @@ -1981,15 +1986,19 @@ * filled and ready for reading. * **************************************/ - rem_port* port = (rem_port*)xdrs->x_public; + rem_port* port = (rem_port*) xdrs->x_public; XCC xcc = port->port_xcc; XCH xch = xcc->xcc_send_channel; xch->xch_length = xdrs->x_private - xdrs->x_base; if (SetEvent(xcc->xcc_event_send_channel_filled)) { + port->port_snd_packets++; + port->port_snd_bytes += xch->xch_length; + xdrs->x_private = xdrs->x_base; xdrs->x_handy = xch->xch_size; + return TRUE; } Modified: firebird/trunk/src/remote/remote.cpp =================================================================== --- firebird/trunk/src/remote/remote.cpp 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/remote.cpp 2014-10-11 09:02:09 UTC (rev 60100) @@ -204,34 +204,12 @@ * The client calculates this number (n from the list above) * and sends it to the server. * - * I asked why it is that the client doesn't just ask for a packet - * full of records and let the server return however many fits in - * a packet. According to Sudesh, this is because of a bug in - * Superserver which showed up in the WIN_NT 4.2.x kits. So I - * imagine once we up the protocol so that we can be sure we're not - * talking to a 4.2 kit, then we can make this optimization. - * - Deej 2/28/97 - * - * Note: A future optimization can look at setting the packet - * size to optimize the transfer. - * - * Note: This calculation must use worst-case to determine the - * packing. Should the data record have VARCHAR data, it is - * often possible to fit more than the packing specification - * into each packet. This is also a candidate for future - * optimization. - * * The data size is either the XDR data representation, or the * actual message size (rounded up) if this is a symmetric * architecture connection. * **************************************/ - const USHORT MAX_PACKETS_PER_BATCH = 4; // packets - picked by SWAG - const USHORT MIN_PACKETS_PER_BATCH = 2; // packets - picked by SWAG - const USHORT DESIRED_ROWS_PER_BATCH = 20; // data rows - picked by SWAG - const USHORT MIN_ROWS_PER_BATCH = 10; // data rows - picked by SWAG - const USHORT op_overhead = (USHORT) xdr_protocol_overhead(op_code); #ifdef DEBUG @@ -241,52 +219,22 @@ format->fmt_length, op_overhead); #endif - ULONG row_size; - if (port->port_flags & PORT_symmetric) - { - // Same architecture connection - row_size = (ROUNDUP(format->fmt_length, 4) + op_overhead); - } - else - { - // Using XDR for data transfer - row_size = (ROUNDUP(format->fmt_net_length, 4) + op_overhead); - } + const ULONG row_size = op_overhead + + (port->port_flags & PORT_symmetric) ? + ROUNDUP(format->fmt_length, 4) : // Same architecture connection + ROUNDUP(format->fmt_net_length, 4); // Using XDR for data transfer - USHORT num_packets = (USHORT) (((DESIRED_ROWS_PER_BATCH * row_size) // data set - + buffer_used // used in 1st pkt - + (port->port_buff_size - 1)) // to round up - / port->port_buff_size); - if (num_packets > MAX_PACKETS_PER_BATCH) - { - num_packets = (USHORT) (((MIN_ROWS_PER_BATCH * row_size) // data set - + buffer_used // used in 1st pkt - + (port->port_buff_size - 1)) // to round up - / port->port_buff_size); - } - num_packets = MAX(num_packets, MIN_PACKETS_PER_BATCH); + ULONG result = (port->port_protocol >= PROTOCOL_VERSION13) ? + MAX_ROWS_PER_BATCH : (MAX_PACKETS_PER_BATCH * port->port_buff_size - buffer_used) / row_size; - // Now that we've picked the number of packets in a batch, - // pack as many rows as we can into the set of packets + // Don't ask for more records than we can cache - ULONG result = (num_packets * port->port_buff_size - buffer_used) / row_size; + result = MIN(result, MAX_BATCH_CACHE_SIZE / format->fmt_length); - // Must always send some messages, even if message size is more - // than packet size. + // Must always send some messages, even if message is larger than packet result = MAX(result, MIN_ROWS_PER_BATCH); -#ifdef DEBUG - { - // CVC: I don't see the point in replacing this with fb_utils::readenv(). - const char* p = getenv("DEBUG_BATCH_SIZE"); - if (p) - result = atoi(p); - fprintf(stderr, "row_size = %lu num_packets = %d\n", row_size, num_packets); - fprintf(stderr, "result = %lu\n", result); - } -#endif - fb_assert(result <= MAX_USHORT); return static_cast<USHORT>(result); } Modified: firebird/trunk/src/remote/remote.h =================================================================== --- firebird/trunk/src/remote/remote.h 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/remote.h 2014-10-11 09:02:09 UTC (rev 60100) @@ -80,7 +80,15 @@ #include "fb_blk.h" #include "firebird/Interface.h" +// Prefetch constants +const ULONG MAX_PACKETS_PER_BATCH = 8; + +const ULONG MIN_ROWS_PER_BATCH = 10; +const ULONG MAX_ROWS_PER_BATCH = 1000; + +const ULONG MAX_BATCH_CACHE_SIZE = 1024 * 1024; // 1 MB + // fwd. decl. namespace Firebird { class Exception; @@ -910,6 +918,11 @@ UCharArrayAutoPtr port_buffer; + FB_UINT64 port_snd_packets; + FB_UINT64 port_rcv_packets; + FB_UINT64 port_snd_bytes; + FB_UINT64 port_rcv_bytes; + public: rem_port(rem_port_t t, size_t rpt) : port_sync(FB_NEW(getPool()) Firebird::RefMutex()), @@ -941,7 +954,8 @@ port_required_encryption(true), // safe default port_known_server_keys(getPool()), port_crypt_plugin(NULL), port_client_crypt_callback(NULL), port_server_crypt_callback(NULL), - port_buffer(FB_NEW(getPool()) UCHAR[rpt]) + port_buffer(FB_NEW(getPool()) UCHAR[rpt]), + port_snd_packets(0), port_rcv_packets(0), port_snd_bytes(0), port_rcv_bytes(0) { addRef(); memset(&port_linger, 0, sizeof port_linger); Modified: firebird/trunk/src/remote/server/server.cpp =================================================================== --- firebird/trunk/src/remote/server/server.cpp 2014-10-11 01:20:07 UTC (rev 60099) +++ firebird/trunk/src/remote/server/server.cpp 2014-10-11 09:02:09 UTC (rev 60100) @@ -3282,11 +3282,6 @@ Rsr* statement; getHandle(statement, sqldata->p_sqldata_statement); - const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0; - - USHORT count = statement->rsr_flags.test(Rsr::NO_BATCH) ? 1 : sqldata->p_sqldata_messages; - USHORT count2 = statement->rsr_flags.test(Rsr::NO_BATCH) ? 0 : count; - // On first fetch, clear the end-of-stream flag & reset the message buffers if (!statement->rsr_flags.test(Rsr::FETCHED)) @@ -3311,6 +3306,7 @@ } } + const ULONG msg_length = statement->rsr_format ? statement->rsr_format->fmt_length : 0; // If required, call setDelayedOutputFormat() @@ -3329,9 +3325,10 @@ statement->rsr_delayed_format = false; } - // Get ready to ship the data out + const USHORT max_records = statement->rsr_flags.test(Rsr::NO_BATCH) ? 1 : sqldata->p_sqldata_messages; + P_SQLDATA* response = &sendL->p_sqldata; sendL->p_operation = op_fetch_response; response->p_sqldata_statement = sqldata->p_sqldata_statement; @@ -3341,17 +3338,18 @@ // Check to see if any messages are already sitting around + const FB_UINT64 org_packets = this->port_snd_packets; + + USHORT count = 0; bool rc = true; - while (true) + for (; count < max_records; count++) { - // Have we exhausted the cache & reached cursor EOF? if (statement->rsr_flags.test(Rsr::EOF_SET) && !statement->rsr_msgs_waiting) { statement->rsr_flags.clear(Rsr::EOF_SET); rc = false; - count2 = 0; break; } @@ -3378,15 +3376,12 @@ rc = statement->rsr_cursor->fetchNext(&status_vector, message->msg_buffer) == IStatus::FB_OK; statement->rsr_flags.set(Rsr::FETCHED); + if (status_vector.getStatus() & Firebird::IStatus::FB_HAS_ERRORS) - { return this->send_response(sendL, 0, 0, &status_vector, false); - } + if (!rc) - { - count2 = 0; break; - } message->msg_address = message->msg_buffer; } @@ -3397,8 +3392,6 @@ statement->rsr_msgs_waiting--; } - count--; - // There's a buffer waiting -- send it if (!this->send_partial(sendL)) @@ -3406,9 +3399,11 @@ message->msg_address = NULL; - // If we've sent the requested amount, break out of loop + // If we've hit maximum prefetch size, break out of loop - if (count <= 0) + const USHORT packets = this->port_snd_packets - org_packets; + + if (packets >= MAX_PACKETS_PER_BATCH && count >= MIN_ROWS_PER_BATCH) break; } @@ -3438,7 +3433,9 @@ while (message->msg_address && message->msg_next != statement->rsr_buffer) message = message->msg_next; - for (; count2; --count2) + USHORT prefetch_count = (rc && !statement->rsr_flags.test(Rsr::NO_BATCH)) ? count : 0; + + for (; prefetch_count; --prefetch_count) { if (message->msg_address) { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |