|
From: mason_s <ma...@us...> - 2010-08-05 18:49:16
|
Project "Postgres-XC".
The branch, master has been updated
via c7476b9cf075aba2dd2ed11ea57c632c1ad6721a (commit)
from 086c5c6be32d4ca9232523cd64caf6d29aaac42c (commit)
- Log -----------------------------------------------------------------
commit c7476b9cf075aba2dd2ed11ea57c632c1ad6721a
Author: Mason S <mas...@ma...>
Date: Thu Aug 5 14:36:37 2010 -0400
Added more handling to deal with data node connection failures.
This includes forcing the release of connections in an unexpected
state and bug fixes.
This was written by Andrei Martsinchyk, with some additional
handling added by Mason.
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index e390f8a..82eca8c 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -205,9 +205,9 @@ static char *tpc_b_bid = {
"\\setrandom tid 1 :ntellers\n"
"\\setrandom delta -5000 5000\n"
"BEGIN;\n"
- "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid AND bid = :bid;\n"
- "SELECT abalance FROM pgbench_accounts WHERE aid = :aid AND bid = :bid\n"
- "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid AND bid = :bid;\n"
+ "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
+ "SELECT abalance FROM pgbench_accounts WHERE aid = :aid\n"
+ "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
"UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
"INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
"END;\n"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 491d0d5..673aad1 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -135,7 +135,7 @@ typedef struct TransactionStateData
{
TransactionId transactionId; /* my XID, or Invalid if none */
#ifdef PGXC /* PGXC_COORD */
- GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */
+ GlobalTransactionId globalTransactionId; /* my GXID, or Invalid if none */
#endif
SubTransactionId subTransactionId; /* my subxact ID */
char *name; /* savepoint name, if any */
@@ -314,7 +314,7 @@ GetCurrentGlobalTransactionId(void)
* GetGlobalTransactionId
*
* This will return the GXID of the specified transaction,
- * getting one from the GTM if it's not yet set.
+ * getting one from the GTM if it's not yet set.
*/
static GlobalTransactionId
GetGlobalTransactionId(TransactionState s)
@@ -469,7 +469,7 @@ AssignTransactionId(TransactionState s)
if (IS_PGXC_COORDINATOR)
{
s->transactionId = (TransactionId) GetGlobalTransactionId(s);
- elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s",
+ elog(DEBUG1, "New transaction id assigned = %d, isSubXact = %s",
s->transactionId, isSubXact ? "true" : "false");
}
else
@@ -1679,6 +1679,14 @@ CommitTransaction(void)
*/
AtEOXact_UpdateFlatFiles(true);
+#ifdef PGXC
+ /*
+ * There can be error on the data nodes. So go to data nodes before
+ * changing transaction state and local clean up
+ */
+ DataNodeCommit();
+#endif
+
/* Prevent cancel/die interrupt while cleaning up */
HOLD_INTERRUPTS();
@@ -1694,13 +1702,13 @@ CommitTransaction(void)
latestXid = RecordTransactionCommit();
TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid);
+
#ifdef PGXC
+ /*
+ * Now we can let GTM know about transaction commit
+ */
if (IS_PGXC_COORDINATOR)
{
- /* Make sure this committed on the DataNodes,
- * if so it will just return
- */
- DataNodeCommit(DestNone);
CommitTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
@@ -1712,7 +1720,7 @@ CommitTransaction(void)
CommitTranGTM((GlobalTransactionId) latestXid);
}
#endif
-
+
/*
* Let others know about no transaction in progress by me. Note that this
* must be done _before_ releasing locks we hold and _after_
@@ -1808,7 +1816,7 @@ CommitTransaction(void)
s->nChildXids = 0;
s->maxChildXids = 0;
-#ifdef PGXC
+#ifdef PGXC
if (IS_PGXC_COORDINATOR)
s->globalTransactionId = InvalidGlobalTransactionId;
else if (IS_PGXC_DATANODE)
@@ -2143,10 +2151,10 @@ AbortTransaction(void)
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
{
- /* Make sure this is rolled back on the DataNodes,
- * if so it will just return
+ /* Make sure this is rolled back on the DataNodes,
+ * if so it will just return
*/
- DataNodeRollback(DestNone);
+ DataNodeRollback();
RollbackTranGTM(s->globalTransactionId);
latestXid = s->globalTransactionId;
}
diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c
index 517b1e4..0f4072d 100644
--- a/src/backend/pgxc/pool/datanode.c
+++ b/src/backend/pgxc/pool/datanode.c
@@ -199,6 +199,9 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum)
handle->sock = sock;
handle->transaction_status = 'I';
handle->state = DN_CONNECTION_STATE_IDLE;
+#ifdef DN_CONNECTION_DEBUG
+ handle->have_row_desc = false;
+#endif
handle->error = NULL;
handle->outEnd = 0;
handle->inStart = 0;
@@ -211,7 +214,7 @@ data_node_init(DataNodeHandle *handle, int sock, int nodenum)
* Wait while at least one of specified connections has data available and read
* the data into the buffer
*/
-void
+int
data_node_receive(const int conn_count,
DataNodeHandle ** connections, struct timeval * timeout)
{
@@ -239,7 +242,7 @@ data_node_receive(const int conn_count,
* Return if we do not have connections to receive input
*/
if (nfds == 0)
- return;
+ return 0;
retry:
res_select = select(nfds + 1, &readfds, NULL, NULL, timeout);
@@ -249,27 +252,19 @@ retry:
if (errno == EINTR || errno == EAGAIN)
goto retry;
- /*
- * PGXCTODO - we may want to close the connections and notify the
- * pooler that these are invalid.
- */
if (errno == EBADF)
{
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("select() bad file descriptor set")));
+ elog(WARNING, "select() bad file descriptor set");
}
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("select() error: %d", errno)));
+ elog(WARNING, "select() error: %d", errno);
+ return errno;
}
if (res_select == 0)
{
/* Handle timeout */
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("timeout while waiting for response")));
+ elog(WARNING, "timeout while waiting for response");
+ return EOF;
}
/* read data */
@@ -283,10 +278,9 @@ retry:
if (read_status == EOF || read_status < 0)
{
- /* PGXCTODO - we should notify the pooler to destroy the connections */
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("unexpected EOF on datanode connection")));
+ add_error_message(conn, "unexpected EOF on datanode connection");
+ elog(WARNING, "unexpected EOF on datanode connection");
+ return EOF;
}
else
{
@@ -294,6 +288,7 @@ retry:
}
}
}
+ return 0;
}
@@ -522,7 +517,7 @@ get_message(DataNodeHandle *conn, int *len, char **msg)
* ensure_in_buffer_capacity() will immediately return
*/
ensure_in_buffer_capacity(5 + (size_t) *len, conn);
- conn->state == DN_CONNECTION_STATE_QUERY;
+ conn->state = DN_CONNECTION_STATE_QUERY;
conn->inCursor = conn->inStart;
return '\0';
}
@@ -539,19 +534,27 @@ void
release_handles(void)
{
int i;
+ int discard[NumDataNodes];
+ int ndisc = 0;
if (node_count == 0)
return;
- PoolManagerReleaseConnections();
for (i = 0; i < NumDataNodes; i++)
{
DataNodeHandle *handle = &handles[i];
if (handle->sock != NO_SOCKET)
+ {
+ if (handle->state != DN_CONNECTION_STATE_IDLE)
+ {
+ elog(WARNING, "Connection to data node %d has unexpected state %d and will be dropped", handle->nodenum, handle->state);
+ discard[ndisc++] = handle->nodenum;
+ }
data_node_free(handle);
+ }
}
-
+ PoolManagerReleaseConnections(ndisc, discard);
node_count = 0;
}
@@ -897,7 +900,7 @@ void
add_error_message(DataNodeHandle *handle, const char *message)
{
handle->transaction_status = 'E';
- handle->state = DN_CONNECTION_STATE_IDLE;
+ handle->state = DN_CONNECTION_STATE_ERROR_NOT_READY;
if (handle->error)
{
/* PGXCTODO append */
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index c6f9042..bbedef0 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -24,6 +24,7 @@
#include "miscadmin.h"
#include "pgxc/execRemote.h"
#include "pgxc/poolmgr.h"
+#include "storage/ipc.h"
#include "utils/datum.h"
#include "utils/memutils.h"
#include "utils/tuplesort.h"
@@ -40,9 +41,10 @@ static bool autocommit = true;
static DataNodeHandle **write_node_list = NULL;
static int write_node_count = 0;
-static int data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid);
-static int data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest);
-static int data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest);
+static int data_node_begin(int conn_count, DataNodeHandle ** connections,
+ GlobalTransactionId gxid);
+static int data_node_commit(int conn_count, DataNodeHandle ** connections);
+static int data_node_rollback(int conn_count, DataNodeHandle ** connections);
static void clear_write_node_list();
@@ -920,7 +922,7 @@ FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot)
/*
* Handle responses from the Data node connections
*/
-static void
+static int
data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
struct timeval * timeout, RemoteQueryState *combiner)
{
@@ -940,7 +942,8 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
{
int i = 0;
- data_node_receive(count, to_receive, timeout);
+ if (data_node_receive(count, to_receive, timeout))
+ return EOF;
while (i < count)
{
int result = handle_response(to_receive[i], combiner);
@@ -959,12 +962,17 @@ data_node_receive_responses(const int conn_count, DataNodeHandle ** connections,
break;
default:
/* Inconsistent responses */
- ereport(ERROR,
- (errcode(ERRCODE_INTERNAL_ERROR),
- errmsg("Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type)));
+ add_error_message(to_receive[i], "Unexpected response from the data nodes");
+ elog(WARNING, "Unexpected response from the data nodes, result = %d, request type %d", result, combiner->request_type);
+ /* Stop tracking and move last connection in place */
+ count--;
+ if (i < count)
+ to_receive[i] = to_receive[count];
}
}
}
+
+ return 0;
}
/*
@@ -990,6 +998,18 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
if (conn->state == DN_CONNECTION_STATE_QUERY)
return RESPONSE_EOF;
+ /*
+ * If we are in the process of shutting down, we
+ * may be rolling back, and the buffer may contain other messages.
+ * We want to avoid a procarray exception
+ * as well as an error stack overflow.
+ */
+ if (proc_exit_inprogress)
+ {
+ conn->state = DN_CONNECTION_STATE_ERROR_FATAL;
+ return RESPONSE_EOF;
+ }
+
/* TODO handle other possible responses */
switch (get_message(conn, &msg_len, &msg))
{
@@ -1005,10 +1025,17 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
HandleCommandComplete(combiner, msg, msg_len);
break;
case 'T': /* RowDescription */
+#ifdef DN_CONNECTION_DEBUG
+ Assert(!conn->have_row_desc);
+ conn->have_row_desc = true;
+#endif
if (HandleRowDescription(combiner, msg, msg_len))
return RESPONSE_TUPDESC;
break;
case 'D': /* DataRow */
+#ifdef DN_CONNECTION_DEBUG
+ Assert(conn->have_row_desc);
+#endif
HandleDataRow(combiner, msg, msg_len);
return RESPONSE_DATAROW;
case 'G': /* CopyInResponse */
@@ -1042,6 +1069,9 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
case 'Z': /* ReadyForQuery */
conn->transaction_status = msg[0];
conn->state = DN_CONNECTION_STATE_IDLE;
+#ifdef DN_CONNECTION_DEBUG
+ conn->have_row_desc = false;
+#endif
return RESPONSE_COMPLETE;
case 'I': /* EmptyQuery */
default:
@@ -1058,7 +1088,8 @@ handle_response(DataNodeHandle * conn, RemoteQueryState *combiner)
* Send BEGIN command to the Data nodes and receive responses
*/
static int
-data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest, GlobalTransactionId gxid)
+data_node_begin(int conn_count, DataNodeHandle ** connections,
+ GlobalTransactionId gxid)
{
int i;
struct timeval *timeout = NULL;
@@ -1078,7 +1109,8 @@ data_node_begin(int conn_count, DataNodeHandle ** connections, CommandDest dest,
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ return EOF;
/* Verify status */
return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
@@ -1109,12 +1141,12 @@ DataNodeBegin(void)
/*
- * Commit current transaction, use two-phase commit if necessary
+ * Commit current transaction on data nodes where it has been started
*/
-int
-DataNodeCommit(CommandDest dest)
+void
+DataNodeCommit(void)
{
- int res;
+ int res = 0;
int tran_count;
DataNodeHandle *connections[NumDataNodes];
@@ -1128,7 +1160,7 @@ DataNodeCommit(CommandDest dest)
if (tran_count == 0)
goto finish;
- res = data_node_commit(tran_count, connections, dest);
+ res = data_node_commit(tran_count, connections);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1138,15 +1170,19 @@ finish:
release_handles();
autocommit = true;
clear_write_node_list();
- return res;
+ if (res != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Could not commit connection on data nodes")));
}
/*
- * Send COMMIT or PREPARE/COMMIT PREPARED down to the Data nodes and handle responses
+ * Commit transaction on specified data node connections, use two-phase commit
+ * if more then on one node data have been modified during the transactioon.
*/
static int
-data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest)
+data_node_commit(int conn_count, DataNodeHandle ** connections)
{
int i;
struct timeval *timeout = NULL;
@@ -1191,13 +1227,12 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ result = EOF;
/* Reset combiner */
if (!ValidateAndResetCombiner(combiner))
- {
result = EOF;
- }
}
if (!do2PC)
@@ -1238,7 +1273,8 @@ data_node_commit(int conn_count, DataNodeHandle ** connections, CommandDest dest
combiner->dest = None_Receiver;
}
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ result = EOF;
result = ValidateAndCloseCombiner(combiner) ? result : EOF;
finish:
@@ -1253,7 +1289,7 @@ finish:
* Rollback current transaction
*/
int
-DataNodeRollback(CommandDest dest)
+DataNodeRollback(void)
{
int res = 0;
int tran_count;
@@ -1269,7 +1305,7 @@ DataNodeRollback(CommandDest dest)
if (tran_count == 0)
goto finish;
- res = data_node_rollback(tran_count, connections, dest);
+ res = data_node_rollback(tran_count, connections);
finish:
/* In autocommit mode statistics is collected in DataNodeExec */
@@ -1287,24 +1323,23 @@ finish:
* Send ROLLBACK command down to the Data nodes and handle responses
*/
static int
-data_node_rollback(int conn_count, DataNodeHandle ** connections, CommandDest dest)
+data_node_rollback(int conn_count, DataNodeHandle ** connections)
{
int i;
struct timeval *timeout = NULL;
- int result = 0;
RemoteQueryState *combiner;
/* Send ROLLBACK - */
for (i = 0; i < conn_count; i++)
{
- if (data_node_send_query(connections[i], "ROLLBACK"))
- result = EOF;
+ data_node_send_query(connections[i], "ROLLBACK");
}
combiner = CreateResponseCombiner(conn_count, COMBINE_TYPE_NONE);
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner))
+ return EOF;
/* Verify status */
return ValidateAndCloseCombiner(combiner) ? 0 : EOF;
@@ -1404,7 +1439,7 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
if (new_count > 0 && need_tran)
{
/* Start transaction on connections where it is not started */
- if (data_node_begin(new_count, newConnections, DestNone, gxid))
+ if (data_node_begin(new_count, newConnections, gxid))
{
pfree(connections);
pfree(copy_connections);
@@ -1448,8 +1483,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
combiner->dest = None_Receiver;
/* Receive responses */
- data_node_receive_responses(conn_count, connections, timeout, combiner);
- if (!ValidateAndCloseCombiner(combiner))
+ if (data_node_receive_responses(conn_count, connections, timeout, combiner)
+ || !ValidateAndCloseCombiner(combiner))
{
if (autocommit)
{
@@ -1665,6 +1700,12 @@ DataNodeCopyOut(Exec_Nodes *exec_nodes, DataNodeHandle** copy_connections, FILE*
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("unexpected EOF on datanode connection")));
+ else
+ /*
+ * Set proper connection status - handle_response
+ * has changed it to DN_CONNECTION_STATE_QUERY
+ */
+ handle->state = DN_CONNECTION_STATE_COPY_OUT;
}
/* There is no more data that can be read from connection */
}
@@ -1746,7 +1787,7 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
combiner = CreateResponseCombiner(conn_count + 1, combine_type);
combiner->dest = None_Receiver;
- data_node_receive_responses(1, &primary_handle, timeout, combiner);
+ error = data_node_receive_responses(1, &primary_handle, timeout, combiner) || error;
}
for (i = 0; i < conn_count; i++)
@@ -1786,30 +1827,14 @@ DataNodeCopyFinish(DataNodeHandle** copy_connections, int primary_data_node,
combiner = CreateResponseCombiner(conn_count, combine_type);
combiner->dest = None_Receiver;
}
- data_node_receive_responses(conn_count, connections, timeout, combiner);
+ error = (data_node_receive_responses(conn_count, connections, timeout, combiner) != 0) || error;
processed = combiner->row_count;
if (!ValidateAndCloseCombiner(combiner) || error)
- {
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else
- if (!PersistentConnections) release_handles();
- }
-
- return 0;
- }
-
- if (autocommit)
- {
- if (need_tran)
- DataNodeCommit(DestNone);
- else
- if (!PersistentConnections) release_handles();
- }
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Error while running COPY")));
return processed;
}
@@ -1882,9 +1907,17 @@ copy_slot(RemoteQueryState *node, TupleTableSlot *src, TupleTableSlot *dst)
else
{
int i;
+
+ /*
+ * Data node may be sending junk columns which are always at the end,
+ * but it must not be shorter then result slot.
+ */
+ Assert(dst->tts_tupleDescriptor->natts <= src->tts_tupleDescriptor->natts);
ExecClearTuple(dst);
slot_getallattrs(src);
- /* PGXCTODO revisit: probably incorrect */
+ /*
+ * PGXCTODO revisit: if it is correct to copy Datums using assignment?
+ */
for (i = 0; i < dst->tts_tupleDescriptor->natts; i++)
{
dst->tts_values[i] = src->tts_values[i];
@@ -1911,6 +1944,8 @@ ExecRemoteQuery(RemoteQueryState *node)
EState *estate = node->ss.ps.state;
TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
+ bool have_tuple = false;
+
if (!node->query_Done)
{
@@ -1974,7 +2009,7 @@ ExecRemoteQuery(RemoteQueryState *node)
else
need_tran = !autocommit || total_conn_count > 1;
- elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
+ elog(DEBUG1, "autocommit = %s, has primary = %s, regular_conn_count = %d, statement_need_tran = %s", autocommit ? "true" : "false", primarynode ? "true" : "false", regular_conn_count, need_tran ? "true" : "false");
stat_statement();
if (autocommit)
@@ -2052,7 +2087,7 @@ ExecRemoteQuery(RemoteQueryState *node)
new_connections[new_count++] = connections[i];
if (new_count)
- data_node_begin(new_count, new_connections, DestNone, gxid);
+ data_node_begin(new_count, new_connections, gxid);
}
/* See if we have a primary nodes, execute on it first before the others */
@@ -2088,36 +2123,22 @@ ExecRemoteQuery(RemoteQueryState *node)
while (node->command_complete_count < 1)
{
- PG_TRY();
- {
- data_node_receive(1, primaryconnection, NULL);
- while (handle_response(primaryconnection[0], node) == RESPONSE_EOF)
- data_node_receive(1, primaryconnection, NULL);
- if (node->errorMessage)
- {
- char *code = node->errorCode;
+ if (data_node_receive(1, primaryconnection, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ while (handle_response(primaryconnection[0], node) == RESPONSE_EOF)
+ if (data_node_receive(1, primaryconnection, NULL))
ereport(ERROR,
- (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
- errmsg("%s", node->errorMessage)));
- }
- }
- /* If we got an error response return immediately */
- PG_CATCH();
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ if (node->errorMessage)
{
- /* We are going to exit, so release combiner */
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
-
- pfree(primaryconnection);
- pfree(connections);
- PG_RE_THROW();
+ char *code = node->errorCode;
+ ereport(ERROR,
+ (errcode(MAKE_SQLSTATE(code[0], code[1], code[2], code[3], code[4])),
+ errmsg("%s", node->errorMessage)));
}
- PG_END_TRY();
}
pfree(primaryconnection);
}
@@ -2148,8 +2169,6 @@ ExecRemoteQuery(RemoteQueryState *node)
}
}
- PG_TRY();
- {
/*
* Stop if all commands are completed or we got a data row and
* initialized state node for subsequent invocations
@@ -2158,7 +2177,10 @@ ExecRemoteQuery(RemoteQueryState *node)
{
int i = 0;
- data_node_receive(regular_conn_count, connections, NULL);
+ if (data_node_receive(regular_conn_count, connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
/*
* Handle input from the data nodes.
* If we got a RESPONSE_DATAROW we can break handling to wrap
@@ -2234,185 +2256,148 @@ ExecRemoteQuery(RemoteQueryState *node)
}
}
}
- }
- /* If we got an error response return immediately */
- PG_CATCH();
- {
- /* We are going to exit, so release combiner */
- if (autocommit)
- {
- if (need_tran)
- DataNodeRollback(DestNone);
- else if (!PersistentConnections)
- release_handles();
- }
- PG_RE_THROW();
- }
- PG_END_TRY();
+
node->query_Done = true;
- node->need_tran = need_tran;
}
- PG_TRY();
+ if (node->tuplesortstate)
{
- bool have_tuple = false;
-
- if (node->tuplesortstate)
+ while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
+ true, scanslot))
{
- while (tuplesort_gettupleslot((Tuplesortstate *) node->tuplesortstate,
- true, scanslot))
+ have_tuple = true;
+ /*
+ * If DISTINCT is specified and current tuple matches to
+ * previous skip it and get next one.
+ * Othervise return current tuple
+ */
+ if (step->distinct)
{
- have_tuple = true;
/*
- * If DISTINCT is specified and current tuple matches to
- * previous skip it and get next one.
- * Othervise return current tuple
+ * Always receive very first tuple and
+ * skip to next if scan slot match to previous (result slot)
*/
- if (step->distinct)
+ if (!TupIsNull(resultslot) &&
+ execTuplesMatch(scanslot,
+ resultslot,
+ step->distinct->numCols,
+ step->distinct->uniqColIdx,
+ node->eqfunctions,
+ node->tmp_ctx))
{
- /*
- * Always receive very first tuple and
- * skip to next if scan slot match to previous (result slot)
- */
- if (!TupIsNull(resultslot) &&
- execTuplesMatch(scanslot,
- resultslot,
- step->distinct->numCols,
- step->distinct->uniqColIdx,
- node->eqfunctions,
- node->tmp_ctx))
- {
- have_tuple = false;
- continue;
- }
+ have_tuple = false;
+ continue;
}
- copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
- break;
}
- if (!have_tuple)
- ExecClearTuple(resultslot);
+ copy_slot(node, scanslot, resultslot);
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ break;
}
- else
+ if (!have_tuple)
+ ExecClearTuple(resultslot);
+ }
+ else
+ {
+ while (node->conn_count > 0 && !have_tuple)
{
- while (node->conn_count > 0 && !have_tuple)
- {
- int i;
+ int i;
- /*
- * If combiner already has tuple go ahead and return it
- * otherwise tuple will be cleared
- */
- if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ /*
+ * If combiner already has tuple go ahead and return it
+ * otherwise tuple will be cleared
+ */
+ if (FetchTuple(node, scanslot) && !TupIsNull(scanslot))
+ {
+ if (node->simple_aggregates)
{
- if (node->simple_aggregates)
- {
- /*
- * Advance aggregate functions and allow to read up next
- * data row message and get tuple in the same slot on
- * next iteration
- */
- exec_simple_aggregates(node, scanslot);
- }
- else
- {
- /*
- * Receive current slot and read up next data row
- * message before exiting the loop. Next time when this
- * function is invoked we will have either data row
- * message ready or EOF
- */
- copy_slot(node, scanslot, resultslot);
- (*node->dest->receiveSlot) (resultslot, node->dest);
- have_tuple = true;
- }
+ /*
+ * Advance aggregate functions and allow to read up next
+ * data row message and get tuple in the same slot on
+ * next iteration
+ */
+ exec_simple_aggregates(node, scanslot);
}
-
- /*
- * Handle input to get next row or ensure command is completed,
- * starting from connection next after current. If connection
- * does not
- */
- if ((i = node->current_conn + 1) == node->conn_count)
- i = 0;
-
- for (;;)
+ else
{
- int res = handle_response(node->connections[i], node);
- if (res == RESPONSE_EOF)
- {
- /* go to next connection */
- if (++i == node->conn_count)
- i = 0;
- /* if we cycled over all connections we need to receive more */
- if (i == node->current_conn)
- data_node_receive(node->conn_count, node->connections, NULL);
- }
- else if (res == RESPONSE_COMPLETE)
- {
- if (--node->conn_count == 0)
- break;
- if (i == node->conn_count)
- i = 0;
- else
- node->connections[i] = node->connections[node->conn_count];
- if (node->current_conn == node->conn_count)
- node->current_conn = i;
- }
- else if (res == RESPONSE_DATAROW)
- {
- node->current_conn = i;
- break;
- }
+ /*
+ * Receive current slot and read up next data row
+ * message before exiting the loop. Next time when this
+ * function is invoked we will have either data row
+ * message ready or EOF
+ */
+ copy_slot(node, scanslot, resultslot);
+ (*node->dest->receiveSlot) (resultslot, node->dest);
+ have_tuple = true;
}
}
/*
- * We may need to finalize aggregates
+ * Handle input to get next row or ensure command is completed,
+ * starting from connection next after current. If connection
+ * does not
*/
- if (!have_tuple && node->simple_aggregates)
+ if ((i = node->current_conn + 1) == node->conn_count)
+ i = 0;
+
+ for (;;)
{
- finish_simple_aggregates(node, resultslot);
- if (!TupIsNull(resultslot))
+ int res = handle_response(node->connections[i], node);
+ if (res == RESPONSE_EOF)
{
- (*node->dest->receiveSlot) (resultslot, node->dest);
- have_tuple = true;
+ /* go to next connection */
+ if (++i == node->conn_count)
+ i = 0;
+ /* if we cycled over all connections we need to receive more */
+ if (i == node->current_conn)
+ if (data_node_receive(node->conn_count, node->connections, NULL))
+ ereport(ERROR,
+ (errcode(ERRCODE_INTERNAL_ERROR),
+ errmsg("Failed to read response from data nodes")));
+ }
+ else if (res == RESPONSE_COMPLETE)
+ {
+ if (--node->conn_count == 0)
+ break;
+ if (i == node->conn_count)
+ i = 0;
+ else
+ node->connections[i] = node->connections[node->conn_count];
+ if (node->current_conn == node->conn_count)
+ node->curren...
[truncated message content] |