From: mason_s <ma...@us...> - 2010-06-29 19:39:21
|
Project "Postgres-XC". The branch, master has been updated via 49e836ebf1c86211c342f320838611fc48e6fa1f (commit) via 6899314e5a0aad2ded36959ca8bc6e3d7243a586 (commit) from 592295640039744c89a1f319d87fb34072a10efa (commit) - Log ----------------------------------------------------------------- commit 49e836ebf1c86211c342f320838611fc48e6fa1f Author: Mason S <masonsharp@mason-sharps-macbook.local> Date: Tue Jun 29 21:32:26 2010 +0200 Add support for ORDER BY adn DISTINCT. This is handled on the Coordinator. It will push down the ORDER BY and merge-sort the sorted input streams from the nodes. It converts from DataRow to tuple format as needed. If one of the SELECT clause expressions is not in the ORDER BY, it appends it to the ORDER BY when pushing it down to the data nodes and leaves it off when returning to the client. With DISTINCT, an ORDER BY will be used and pushed down to the data nodes such that a merge-sort can be done and de-duplication can occur. By Andrei Martsinchyk diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index a86716e..eab1bd0 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -57,6 +57,9 @@ #include "postgres.h" +#ifdef PGXC +#include "funcapi.h" +#endif #include "access/heapam.h" #include "access/sysattr.h" #include "access/tuptoaster.h" @@ -1157,6 +1160,80 @@ slot_deform_tuple(TupleTableSlot *slot, int natts) slot->tts_slow = slow; } +#ifdef PGXC +/* + * slot_deform_datarow + * Extract data from the DataRow message into Datum/isnull arrays. + * We always extract all atributes, as specified in tts_tupleDescriptor, + * because there is no easy way to find random attribute in the DataRow. + */ +static void +slot_deform_datarow(TupleTableSlot *slot) +{ + int attnum = slot->tts_tupleDescriptor->natts; + int i; + int col_count; + char *cur = slot->tts_dataRow; + StringInfo buffer; + uint16 n16; + uint32 n32; + + /* fastpath: exit if values already extracted */ + if (slot->tts_nvalid == attnum) + return; + + Assert(slot->tts_dataRow); + + memcpy(&n16, cur, 2); + cur += 2; + col_count = ntohs(n16); + + if (col_count != attnum) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("Tuple does not match the descriptor"))); + + if (slot->tts_attinmeta == NULL) + slot->tts_attinmeta = TupleDescGetAttInMetadata(slot->tts_tupleDescriptor); + + buffer = makeStringInfo(); + for (i = 0; i < attnum; i++) + { + Form_pg_attribute attr = slot->tts_tupleDescriptor->attrs[i]; + int len; + + /* get size */ + memcpy(&n32, cur, 4); + cur += 4; + len = ntohl(n32); + + /* get data */ + if (len == -1) + { + slot->tts_values[i] = (Datum) 0; + slot->tts_isnull[i] = true; + } + else + { + appendBinaryStringInfo(buffer, cur, len); + cur += len; + + slot->tts_values[i] = InputFunctionCall(slot->tts_attinmeta->attinfuncs + i, + buffer->data, + slot->tts_attinmeta->attioparams[i], + slot->tts_attinmeta->atttypmods[i]); + slot->tts_isnull[i] = false; + + resetStringInfo(buffer); + } + } + pfree(buffer->data); + pfree(buffer); + + slot->tts_nvalid = attnum; +} +#endif + /* * slot_getattr * This function fetches an attribute of the slot's current tuple. @@ -1250,6 +1327,11 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull) /* * Extract the attribute, along with any preceding attributes. */ +#ifdef PGXC + if (slot->tts_dataRow) + slot_deform_datarow(slot); + else +#endif slot_deform_tuple(slot, attnum); /* @@ -1276,6 +1358,15 @@ slot_getallattrs(TupleTableSlot *slot) if (slot->tts_nvalid == tdesc_natts) return; +#ifdef PGXC + /* Handle the DataRow tuple case */ + if (slot->tts_dataRow) + { + slot_deform_datarow(slot); + return; + } +#endif + /* * otherwise we had better have a physical tuple (tts_nvalid should equal * natts in all virtual-tuple cases) @@ -1319,6 +1410,15 @@ slot_getsomeattrs(TupleTableSlot *slot, int attnum) if (slot->tts_nvalid >= attnum) return; +#ifdef PGXC + /* Handle the DataRow tuple case */ + if (slot->tts_dataRow) + { + slot_deform_datarow(slot); + return; + } +#endif + /* Check for caller error */ if (attnum <= 0 || attnum > slot->tts_tupleDescriptor->natts) elog(ERROR, "invalid attribute number %d", attnum); diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c index ee9c04a..99cd92e 100644 --- a/src/backend/access/common/printtup.c +++ b/src/backend/access/common/printtup.c @@ -292,6 +292,19 @@ printtup(TupleTableSlot *slot, DestReceiver *self) int natts = typeinfo->natts; int i; +#ifdef PGXC + /* + * If we are having DataRow-based tuple we do not have to encode attribute + * values, just send over the DataRow message as we received it from the + * data node + */ + if (slot->tts_dataRow) + { + pq_putmessage('D', slot->tts_dataRow, slot->tts_dataLen); + return; + } +#endif + /* Set or update my derived attribute info, if needed */ if (myState->attrinfo != typeinfo || myState->nattrs != natts) printtup_prepare_info(myState, typeinfo, natts); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index d641df8..08e35ae 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -35,7 +35,7 @@ #include "parser/parse_relation.h" #ifdef PGXC #include "pgxc/pgxc.h" -#include "pgxc/datanode.h" +#include "pgxc/execRemote.h" #include "pgxc/locator.h" #include "pgxc/poolmgr.h" #endif @@ -1511,8 +1511,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString) DataNodeCopyFinish( cstate->connections, primary_data_node, - COMBINE_TYPE_NONE, - whereToSendOutput); + COMBINE_TYPE_NONE); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1526,14 +1525,12 @@ DoCopy(const CopyStmt *stmt, const char *queryString) cstate->processed = DataNodeCopyFinish( cstate->connections, primary_data_node, - COMBINE_TYPE_SAME, - whereToSendOutput); + COMBINE_TYPE_SAME); else cstate->processed = DataNodeCopyFinish( cstate->connections, 0, - COMBINE_TYPE_SUM, - whereToSendOutput); + COMBINE_TYPE_SUM); pfree(cstate->connections); pfree(cstate->query_buf.data); FreeRelationLocInfo(cstate->rel_loc); @@ -1775,10 +1772,10 @@ CopyTo(CopyState cstate) #ifdef PGXC if (IS_PGXC_COORDINATOR && !cstate->on_coord) { - DataNodeCopyOut(GetRelationNodes(cstate->rel_loc, NULL, true), - cstate->connections, - whereToSendOutput, - cstate->copy_file); + cstate->processed = DataNodeCopyOut( + GetRelationNodes(cstate->rel_loc, NULL, true), + cstate->connections, + cstate->copy_file); } else { diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 06142c9..53e424b 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -149,6 +149,12 @@ ExecCreateTupleTable(int tableSize) slot->tts_shouldFreeMin = false; slot->tts_tuple = NULL; slot->tts_tupleDescriptor = NULL; +#ifdef PGXC + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + slot->tts_attinmeta = NULL; +#endif slot->tts_mcxt = CurrentMemoryContext; slot->tts_buffer = InvalidBuffer; slot->tts_nvalid = 0; @@ -228,6 +234,12 @@ MakeSingleTupleTableSlot(TupleDesc tupdesc) slot->tts_shouldFreeMin = false; slot->tts_tuple = NULL; slot->tts_tupleDescriptor = NULL; +#ifdef PGXC + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + slot->tts_attinmeta = NULL; +#endif slot->tts_mcxt = CurrentMemoryContext; slot->tts_buffer = InvalidBuffer; slot->tts_nvalid = 0; @@ -334,6 +346,12 @@ ExecSetSlotDescriptor(TupleTableSlot *slot, /* slot to change */ if (slot->tts_tupleDescriptor) ReleaseTupleDesc(slot->tts_tupleDescriptor); +#ifdef PGXC + /* XXX there in no routine to release AttInMetadata instance */ + if (slot->tts_attinmeta) + slot->tts_attinmeta = NULL; +#endif + if (slot->tts_values) pfree(slot->tts_values); if (slot->tts_isnull) @@ -415,6 +433,14 @@ ExecStoreTuple(HeapTuple tuple, heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif /* * Store the new tuple into the specified slot. @@ -476,6 +502,14 @@ ExecStoreMinimalTuple(MinimalTuple mtup, heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif /* * Drop the pin on the referenced buffer, if there is one. @@ -504,6 +538,62 @@ ExecStoreMinimalTuple(MinimalTuple mtup, return slot; } +#ifdef PGXC +/* -------------------------------- + * ExecStoreDataRowTuple + * + * Store a buffer in DataRow message format into the slot. + * + * -------------------------------- + */ +TupleTableSlot * +ExecStoreDataRowTuple(char *msg, size_t len, TupleTableSlot *slot, bool shouldFree) +{ + /* + * sanity checks + */ + Assert(msg != NULL); + Assert(len > 0); + Assert(slot != NULL); + Assert(slot->tts_tupleDescriptor != NULL); + + /* + * Free any old physical tuple belonging to the slot. + */ + if (slot->tts_shouldFree) + heap_freetuple(slot->tts_tuple); + if (slot->tts_shouldFreeMin) + heap_free_minimal_tuple(slot->tts_mintuple); + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + /* + * Drop the pin on the referenced buffer, if there is one. + */ + if (BufferIsValid(slot->tts_buffer)) + ReleaseBuffer(slot->tts_buffer); + + slot->tts_buffer = InvalidBuffer; + + /* + * Store the new tuple into the specified slot. + */ + slot->tts_isempty = false; + slot->tts_shouldFree = false; + slot->tts_shouldFreeMin = false; + slot->tts_shouldFreeRow = shouldFree; + slot->tts_tuple = NULL; + slot->tts_mintuple = NULL; + slot->tts_dataRow = msg; + slot->tts_dataLen = len; + + /* Mark extracted state invalid */ + slot->tts_nvalid = 0; + + return slot; +} +#endif + /* -------------------------------- * ExecClearTuple * @@ -527,6 +617,14 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ heap_freetuple(slot->tts_tuple); if (slot->tts_shouldFreeMin) heap_free_minimal_tuple(slot->tts_mintuple); +#ifdef PGXC + if (slot->tts_shouldFreeRow) + pfree(slot->tts_dataRow); + + slot->tts_shouldFreeRow = false; + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; +#endif slot->tts_tuple = NULL; slot->tts_mintuple = NULL; @@ -634,7 +732,13 @@ ExecCopySlotTuple(TupleTableSlot *slot) return heap_copytuple(slot->tts_tuple); if (slot->tts_mintuple) return heap_tuple_from_minimal_tuple(slot->tts_mintuple); - +#ifdef PGXC + /* + * Ensure values are extracted from data row to the Datum array + */ + if (slot->tts_dataRow) + slot_getallattrs(slot); +#endif /* * Otherwise we need to build a tuple from the Datum array. */ @@ -667,7 +771,13 @@ ExecCopySlotMinimalTuple(TupleTableSlot *slot) return heap_copy_minimal_tuple(slot->tts_mintuple); if (slot->tts_tuple) return minimal_tuple_from_heap_tuple(slot->tts_tuple); - +#ifdef PGXC + /* + * Ensure values are extracted from data row to the Datum array + */ + if (slot->tts_dataRow) + slot_getallattrs(slot); +#endif /* * Otherwise we need to build a tuple from the Datum array. */ @@ -861,6 +971,14 @@ ExecMaterializeSlot(TupleTableSlot *slot) if (!slot->tts_shouldFreeMin) slot->tts_mintuple = NULL; +#ifdef PGXC + if (!slot->tts_shouldFreeRow) + { + slot->tts_dataRow = NULL; + slot->tts_dataLen = -1; + } +#endif + return slot->tts_tuple; } diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c index ae537e5..1bbbb75 100644 --- a/src/backend/pgxc/plan/planner.c +++ b/src/backend/pgxc/plan/planner.c @@ -22,8 +22,10 @@ #include "catalog/pg_type.h" #include "lib/stringinfo.h" #include "nodes/nodeFuncs.h" +#include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "optimizer/clauses.h" +#include "optimizer/tlist.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "pgxc/locator.h" @@ -123,12 +125,10 @@ typedef struct XCWalkerContext int varno; bool within_or; bool within_not; + List *join_list; /* A list of List*'s, one for each relation. */ } XCWalkerContext; -/* A list of List*'s, one for each relation. */ -List *join_list = NULL; - /* Forbid unsafe SQL statements */ bool StrictStatementChecking = true; @@ -185,12 +185,12 @@ new_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * Look up the join struct for a particular join */ static PGXC_Join * -find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) +find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context) { ListCell *lc; /* return if list is still empty */ - if (join_list == NULL) + if (context->join_list == NULL) return NULL; /* in the PGXC_Join struct, we always sort with relid1 < relid2 */ @@ -209,7 +209,7 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * there should be a small number, so we just search linearly, although * long term a hash table would be better. */ - foreach(lc, join_list) + foreach(lc, context->join_list) { PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc); @@ -225,16 +225,16 @@ find_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) * Find or create a join between 2 relations */ static PGXC_Join * -find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2) +find_or_create_pgxc_join(int relid1, char *aliasname1, int relid2, char *aliasname2, XCWalkerContext *context) { PGXC_Join *pgxcjoin; - pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2); + pgxcjoin = find_pgxc_join(relid1, aliasname1, relid2, aliasname2, context); if (pgxcjoin == NULL) { pgxcjoin = new_pgxc_join(relid1, aliasname1, relid2, aliasname2); - join_list = lappend(join_list, pgxcjoin); + context->join_list = lappend(context->join_list, pgxcjoin); } return pgxcjoin; @@ -277,7 +277,7 @@ free_special_relations(Special_Conditions *special_conditions) * frees join_list */ static void -free_join_list(void) +free_join_list(List *join_list) { if (join_list == NULL) return; @@ -368,13 +368,13 @@ get_base_var(Var *var, XCWalkerContext *context) } else if (rte->rtekind == RTE_SUBQUERY) { - /* + /* * Handle views like select * from v1 where col1 = 1 * where col1 is partition column of base relation */ /* the varattno corresponds with the subquery's target list (projections) */ TargetEntry *tle = list_nth(rte->subquery->targetList, var->varattno - 1); /* or varno? */ - + if (!IsA(tle->expr, Var)) return NULL; /* not column based expressoin, return */ else @@ -684,7 +684,7 @@ examine_conditions_walker(Node *expr_node, XCWalkerContext *context) /* get data struct about these two relations joining */ pgxc_join = find_or_create_pgxc_join(column_base->relid, column_base->relalias, - column_base2->relid, column_base2->relalias); + column_base2->relid, column_base2->relalias, context); if (rel_loc_info1->locatorType == LOCATOR_TYPE_REPLICATED) { @@ -914,7 +914,7 @@ contains_only_pg_catalog (List *rtable) { if (get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE) return false; - } else if (rte->rtekind == RTE_SUBQUERY && + } else if (rte->rtekind == RTE_SUBQUERY && !contains_only_pg_catalog (rte->subquery->rtable)) return false; } @@ -967,7 +967,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) { /* May be complicated. Before giving up, just check for pg_catalog usage */ if (contains_only_pg_catalog (query->rtable)) - { + { /* just pg_catalog tables */ context->exec_nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes)); context->exec_nodes->tableusagetype = TABLE_USAGE_TYPE_PGCATALOG; @@ -1018,7 +1018,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* We compare to make sure that the subquery is safe to execute with previous- * we may have multiple ones in the FROM clause. - * We handle the simple case of allowing multiple subqueries in the from clause, + * We handle the simple case of allowing multiple subqueries in the from clause, * but only allow one of them to not contain replicated tables */ if (!from_query_nodes) @@ -1028,20 +1028,20 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) /* ok, safe */ if (!from_query_nodes) from_query_nodes = current_nodes; - } + } else { if (from_query_nodes->tableusagetype == TABLE_USAGE_TYPE_USER_REPLICATED) from_query_nodes = current_nodes; else { - /* Allow if they are both using one node, and the same one */ + /* Allow if they are both using one node, and the same one */ if (!same_single_node (from_query_nodes->nodelist, current_nodes->nodelist)) /* Complicated */ return true; } } - } + } else if (rte->rtekind == RTE_RELATION) { /* Look for pg_catalog tables */ @@ -1049,7 +1049,7 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) current_usage_type = TABLE_USAGE_TYPE_PGCATALOG; else current_usage_type = TABLE_USAGE_TYPE_USER; - } + } else if (rte->rtekind == RTE_FUNCTION) { /* See if it is a catalog function */ @@ -1095,9 +1095,9 @@ get_plan_nodes_walker(Node *query_node, XCWalkerContext *context) return true; /* Examine join conditions, see if each join is single-node safe */ - if (join_list != NULL) + if (context->join_list != NULL) { - foreach(lc, join_list) + foreach(lc, context->join_list) { PGXC_Join *pgxcjoin = (PGXC_Join *) lfirst(lc); @@ -1254,22 +1254,28 @@ static Exec_Nodes * get_plan_nodes(Query *query, bool isRead) { Exec_Nodes *result_nodes; - XCWalkerContext *context = palloc0(sizeof(XCWalkerContext)); - - context->query = query; - context->isRead = isRead; - - context->conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); - context->rtables = lappend(context->rtables, query->rtable); - - join_list = NULL; - - if (get_plan_nodes_walker((Node *) query, context)) + XCWalkerContext context; + + + context.query = query; + context.isRead = isRead; + context.exec_nodes = NULL; + context.conditions = (Special_Conditions *) palloc0(sizeof(Special_Conditions)); + context.rtables = NIL; + context.rtables = lappend(context.rtables, query->rtable); + context.multilevel_join = false; + context.varno = 0; + context.within_or = false; + context.within_not = false; + context.join_list = NIL; + + if (get_plan_nodes_walker((Node *) query, &context)) result_nodes = NULL; else - result_nodes = context->exec_nodes; + result_nodes = context.exec_nodes; - free_special_relations(context->conditions); + free_special_relations(context.conditions); + free_join_list(context.join_list); return result_nodes; } @@ -1304,7 +1310,6 @@ get_plan_nodes_command(Query *query) return NULL; } - free_join_list(); return exec_nodes; } @@ -1345,17 +1350,17 @@ static List * get_simple_aggregates(Query * query) { List *simple_agg_list = NIL; - + /* Check for simple multi-node aggregate */ if (query->hasAggs) { ListCell *lc; int column_pos = 0; - + foreach (lc, query->targetList) { TargetEntry *tle = (TargetEntry *) lfirst(lc); - + if (IsA(tle->expr, Aggref)) { /*PGXC borrowed this code from nodeAgg.c, see ExecInitAgg()*/ @@ -1422,7 +1427,7 @@ get_simple_aggregates(Query * query) get_func_name(finalfn_oid)); } } - + /* resolve actual type of transition state, if polymorphic */ aggcollecttype = aggform->aggcollecttype; @@ -1468,7 +1473,7 @@ get_simple_aggregates(Query * query) get_typlenbyval(aggcollecttype, &simple_agg->transtypeLen, &simple_agg->transtypeByVal); - + /* * initval is potentially null, so don't try to access it as a struct * field. Must do it the hard way with SysCacheGetAttr. @@ -1534,6 +1539,427 @@ get_simple_aggregates(Query * query) /* + * add_sort_column --- utility subroutine for building sort info arrays + * + * We need this routine because the same column might be selected more than + * once as a sort key column; if so, the extra mentions are redundant. + * + * Caller is assumed to have allocated the arrays large enough for the + * max possible number of columns. Return value is the new column count. + * + * PGXC: copied from optimizer/plan/planner.c + */ +static int +add_sort_column(AttrNumber colIdx, Oid sortOp, bool nulls_first, + int numCols, AttrNumber *sortColIdx, + Oid *sortOperators, bool *nullsFirst) +{ + int i; + + Assert(OidIsValid(sortOp)); + + for (i = 0; i < numCols; i++) + { + /* + * Note: we check sortOp because it's conceivable that "ORDER BY foo + * USING <, foo USING <<<" is not redundant, if <<< distinguishes + * values that < considers equal. We need not check nulls_first + * however because a lower-order column with the same sortop but + * opposite nulls direction is redundant. + */ + if (sortColIdx[i] == colIdx && sortOperators[i] == sortOp) + { + /* Already sorting by this col, so extra sort key is useless */ + return numCols; + } + } + + /* Add the column */ + sortColIdx[numCols] = colIdx; + sortOperators[numCols] = sortOp; + nullsFirst[numCols] = nulls_first; + return numCols + 1; +} + +/* + * add_distinct_column - utility subroutine to remove redundant columns, just + * like add_sort_column + */ +static int +add_distinct_column(AttrNumber colIdx, Oid eqOp, int numCols, + AttrNumber *sortColIdx, Oid *eqOperators) +{ + int i; + + Assert(OidIsValid(eqOp)); + + for (i = 0; i < numCols; i++) + { + if (sortColIdx[i] == colIdx && eqOperators[i] == eqOp) + { + /* Already sorting by this col, so extra sort key is useless */ + return numCols; + } + } + + /* Add the column */ + sortColIdx[numCols] = colIdx; + eqOperators[numCols] = eqOp; + return numCols + 1; +} + + +/* + * Reconstruct the step query + */ +static void +reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort, + RemoteQuery *step) +{ + List *context; + bool useprefix; + List *sub_tlist = step->plan.targetlist; + ListCell *l; + StringInfo buf = makeStringInfo(); + char *sql; + char *cur; + char *sql_from; + + context = deparse_context_for_plan((Node *) step, NULL, rtable, NIL); + useprefix = list_length(rtable) > 1; + + foreach(l, sub_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + char *exprstr = deparse_expression((Node *) tle->expr, context, + useprefix, false); + + if (buf->len == 0) + { + appendStringInfo(buf, "SELECT "); + if (step->distinct) + appendStringInfo(buf, "DISTINCT "); + } + else + appendStringInfo(buf, ", "); + + appendStringInfoString(buf, exprstr); + } + + /* + * A kind of dummy + * Do not reconstruct remaining query, just search original statement + * for " FROM " and append remainder to the target list we just generated. + * Do not handle the case if " FROM " we found is not a "FROM" keyword, but, + * for example, a part of string constant. + */ + sql = pstrdup(step->sql_statement); /* mutable copy */ + /* string to upper case, for comparing */ + cur = sql; + while (*cur) + { + /* replace whitespace with a space */ + if (isspace((unsigned char) *cur)) + *cur = ' '; + *cur++ = toupper(*cur); + } + + /* find the keyword */ + sql_from = strstr(sql, " FROM "); + if (sql_from) + { + /* the same offset in the original string */ + int offset = sql_from - sql; + /* remove terminating semicolon */ + char *end = strrchr(step->sql_statement, ';'); + *end = '\0'; + + appendStringInfoString(buf, step->sql_statement + offset); + } + + if (extra_sort) + { + foreach(l, extra_sort) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + char *exprstr = deparse_expression((Node *) tle->expr, context, + useprefix, false); + + if (has_order_by) + appendStringInfo(buf, ", "); + else + { + appendStringInfo(buf, " ORDER BY "); + has_order_by = true; + } + + appendStringInfoString(buf, exprstr); + } + } + + /* do not need the copy */ + pfree(sql); + + /* free previous query */ + pfree(step->sql_statement); + /* get a copy of new query */ + step->sql_statement = pstrdup(buf->data); + /* free the query buffer */ + pfree(buf->data); + pfree(buf); +} + + +/* + * Plan to sort step tuples + * PGXC: copied and adopted from optimizer/plan/planner.c + */ +static void +make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step) +{ + List *sortcls = query->sortClause; + List *distinctcls = query->distinctClause; + List *sub_tlist = step->plan.targetlist; + SimpleSort *sort; + SimpleDistinct *distinct; + ListCell *l; + int numsortkeys; + int numdistkeys; + AttrNumber *sortColIdx; + AttrNumber *distColIdx; + Oid *sortOperators; + Oid *eqOperators; + bool *nullsFirst; + bool need_reconstruct = false; + /* + * List of target list entries from DISTINCT which are not in the ORDER BY. + * The exressions should be appended to the ORDER BY clause of remote query + */ + List *extra_distincts = NIL; + + Assert(step->sort == NULL); + Assert(step->distinct == NULL); + + /* + * We will need at most list_length(sortcls) sort columns; possibly less + * Also need room for extra distinct expressions if we need to append them + */ + numsortkeys = list_length(sortcls) + list_length(distinctcls); + sortColIdx = (AttrNumber *) palloc(numsortkeys * sizeof(AttrNumber)); + sortOperators = (Oid *) palloc(numsortkeys * sizeof(Oid)); + nullsFirst = (bool *) palloc(numsortkeys * sizeof(bool)); + + numsortkeys = 0; + sort = (SimpleSort *) palloc(sizeof(SimpleSort)); + + if (sortcls) + { + foreach(l, sortcls) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist); + + if (tle->resjunk) + need_reconstruct = true; + + /* + * Check for the possibility of duplicate order-by clauses --- the + * parser should have removed 'em, but no point in sorting + * redundantly. + */ + numsortkeys = add_sort_column(tle->resno, sortcl->sortop, + sortcl->nulls_first, + numsortkeys, + sortColIdx, sortOperators, nullsFirst); + } + } + + if (distinctcls) + { + /* + * Validate distinct clause + * We have to sort tuples to filter duplicates, and if ORDER BY clause + * is already present the sort order specified here may be incompatible + * with order needed for distinct. + * + * To be compatible, all expressions from DISTINCT must appear at the + * beginning of ORDER BY list. If list of DISTINCT expressions is longer + * then ORDER BY we can make ORDER BY compatible we can append remaining + * expressions from DISTINCT to ORDER BY. Obviously ORDER BY must not + * contain expressions not from the DISTINCT list in this case. + * + * For validation purposes we use column indexes (AttrNumber) to + * identify expressions. May be this is not enough and we should revisit + * the algorithm. + * + * We validate compatibility as follow: + * 1. Make working copy of DISTINCT + * 1a. Remove possible duplicates when copying: do not add expression + * 2. If order by is empty they are already compatible, skip 3 + * 3. Iterate over ORDER BY items + * 3a. If the item is in the working copy delete it from the working + * list. If working list is empty after deletion DISTINCT and + * ORDER BY are compatible, so break the loop. If working list is + * not empty continue iterating + * 3b. ORDER BY clause may contain duplicates. So if we can not found + * expression in the remainder of DISTINCT, probably it has already + * been removed because of duplicate ORDER BY entry. Check original + * DISTINCT clause, if expression is there continue iterating. + * 3c. DISTINCT and ORDER BY are not compatible, emit error + * 4. DISTINCT and ORDER BY are compatible, if we have remaining items + * in the working copy we should append it to the order by list + */ + /* + * Create the list of unique DISTINCT clause expressions + */ + foreach(l, distinctcls) + { + SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist); + bool found = false; + + if (extra_distincts) + { + ListCell *xl; + + foreach(xl, extra_distincts) + { + TargetEntry *xtle = (TargetEntry *) lfirst(xl); + if (xtle->resno == tle->resno) + { + found = true; + break; + } + } + } + + if (!found) + extra_distincts = lappend(extra_distincts, tle); + } + + if (sortcls) + { + foreach(l, sortcls) + { + SortGroupClause *sortcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(sortcl, sub_tlist); + bool found = false; + ListCell *xl; + ListCell *prev = NULL; + + /* Search for the expression in the DISTINCT clause */ + foreach(xl, extra_distincts) + { + TargetEntry *xtle = (TargetEntry *) lfirst(xl); + if (xtle->resno == tle->resno) + { + extra_distincts = list_delete_cell(extra_distincts, xl, + prev); + found = true; + break; + } + prev = xl; + } + + /* Probably we've done */ + if (found && list_length(extra_distincts) == 0) + break; + + /* Ensure sort expression is not a duplicate */ + if (!found) + { + foreach(xl, distinctcls) + { + SortGroupClause *xcl = (SortGroupClause *) lfirst(xl); + TargetEntry *xtle = get_sortgroupclause_tle(xcl, sub_tlist); + if (xtle->resno == tle->resno) + { + /* it is a duplicate then */ + found = true; + break; + } + } + } + + /* Give up, we do not support it */ + if (!found) + { + ereport(ERROR, + (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), + (errmsg("Such combination of ORDER BY and DISTINCT is not yet supported")))); + } + } + } + /* need to append to the ORDER BY */ + if (list_length(extra_distincts) > 0) + need_reconstruct = true; + + /* + * End of validation, expression to append to ORDER BY are in the + * extra_distincts list + */ + + distinct = (SimpleDistinct *) palloc(sizeof(SimpleDistinct)); + + /* + * We will need at most list_length(distinctcls) sort columns + */ + numdistkeys = list_length(distinctcls); + distColIdx = (AttrNumber *) palloc(numdistkeys * sizeof(AttrNumber)); + eqOperators = (Oid *) palloc(numdistkeys * sizeof(Oid)); + + numdistkeys = 0; + + foreach(l, distinctcls) + { + SortGroupClause *distinctcl = (SortGroupClause *) lfirst(l); + TargetEntry *tle = get_sortgroupclause_tle(distinctcl, sub_tlist); + + /* + * Check for the possibility of duplicate order-by clauses --- the + * parser should have removed 'em, but no point in sorting + * redundantly. + */ + numdistkeys = add_distinct_column(tle->resno, + distinctcl->eqop, + numdistkeys, + distColIdx, + eqOperators); + /* append also extra sort operator, if not already there */ + numsortkeys = add_sort_column(tle->resno, + distinctcl->sortop, + distinctcl->nulls_first, + numsortkeys, + sortColIdx, + sortOperators, + nullsFirst); + } + + Assert(numdistkeys > 0); + + distinct->numCols = numdistkeys; + distinct->uniqColIdx = distColIdx; + distinct->eqOperators = eqOperators; + + step->distinct = distinct; + } + + + Assert(numsortkeys > 0); + + sort->numCols = numsortkeys; + sort->sortColIdx = sortColIdx; + sort->sortOperators = sortOperators; + sort->nullsFirst = nullsFirst; + + step->sort = sort; + + if (need_reconstruct) + reconstruct_step_query(query->rtable, sortcls != NULL, extra_distincts, + step); +} + +/* * Build up a QueryPlan to execute on. * * For the prototype, there will only be one step, @@ -1543,17 +1969,16 @@ Query_Plan * GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) { Query_Plan *query_plan = palloc(sizeof(Query_Plan)); - Query_Step *query_step = palloc(sizeof(Query_Step)); + RemoteQuery *query_step = makeNode(RemoteQuery); Query *query; - - query_plan->force_autocommit = false; - query_step->sql_statement = (char *) palloc(strlen(sql_statement) + 1); strcpy(query_step->sql_statement, sql_statement); query_step->exec_nodes = NULL; query_step->combine_type = COMBINE_TYPE_NONE; query_step->simple_aggregates = NULL; + query_step->read_only = false; + query_step->force_autocommit = false; query_plan->query_step_list = lappend(NULL, query_step); @@ -1565,11 +1990,16 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) switch (nodeTag(parsetree)) { case T_SelectStmt: + /* Optimize multi-node handling */ + query_step->read_only = true; + /* fallthru */ case T_InsertStmt: case T_UpdateStmt: case T_DeleteStmt: /* just use first one in querytree_list */ query = (Query *) linitial(querytree_list); + /* should copy instead ? */ + query_step->plan.targetlist = query->targetList; /* Perform some checks to make sure we can support the statement */ if (nodeTag(parsetree) == T_SelectStmt) @@ -1633,6 +2063,12 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) } /* + * Add sortring to the step + */ + if (query->sortClause || query->distinctClause) + make_simple_sort_from_sortclauses(query, query_step); + + /* * PG-XC cannot yet support some variations of SQL statements. * We perform some checks to at least catch common cases */ @@ -1658,15 +2094,6 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) ereport(ERROR, (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), (errmsg("Multi-node LIMIT not yet supported")))); - if (query->sortClause && StrictSelectChecking) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node ORDER BY not yet supported")))); - /* PGXCTODO - check if first column partitioning column */ - if (query->distinctClause) - ereport(ERROR, - (errcode(ERRCODE_STATEMENT_TOO_COMPLEX), - (errmsg("Multi-node DISTINCT`not yet supported")))); } } break; @@ -1686,7 +2113,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) case T_DropdbStmt: case T_VacuumStmt: query_plan->exec_loc_type = EXEC_ON_COORD | EXEC_ON_DATA_NODES; - query_plan->force_autocommit = true; + query_step->force_autocommit = true; break; case T_DropPropertyStmt: @@ -1864,7 +2291,7 @@ GetQueryPlan(Node *parsetree, const char *sql_statement, List *querytree_list) * Free Query_Step struct */ static void -free_query_step(Query_Step *query_step) +free_query_step(RemoteQuery *query_step) { if (query_step == NULL) return; @@ -1894,7 +2321,7 @@ FreeQueryPlan(Query_Plan *query_plan) return; foreach(item, query_plan->query_step_list) - free_query_step((Query_Step *) lfirst(item)); + free_query_step((RemoteQuery *) lfirst(item)); pfree(query_plan->query_step_list); pfree(query_plan); diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile index 7143af5..e875303 100644 --- a/src/backend/pgxc/pool/Makefile +++ b/src/backend/pgxc/pool/Makefile @@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = combiner.o datanode.o poolmgr.o poolcomm.o +OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/pgxc/pool/combiner.c b/src/backend/pgxc/pool/combiner.c deleted file mode 100644 index 53e5dfb..0000000 --- a/src/backend/pgxc/pool/combiner.c +++ /dev/null @@ -1,652 +0,0 @@ -/*------------------------------------------------------------------------- - * - * combiner.c - * - * Combine responses from multiple Data Nodes - * - * - * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group - * Portions Copyright (c) 2010 Nippon Telegraph and Telephone Corporation - * - * IDENTIFICATION - * $$ - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "pgxc/combiner.h" -#include "pgxc/planner.h" -#include "catalog/pg_type.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" -#include "utils/builtins.h" -#include "utils/datum.h" - - -/* - * Create a structure to store parameters needed to combine responses from - * multiple connections as well as state information - */ -ResponseCombiner -CreateResponseCombiner(int node_count, CombineType combine_type, - CommandDest dest) -{ - ResponseCombiner combiner; - - /* ResponseComber is a typedef for pointer to ResponseCombinerData */ - combiner = (ResponseCombiner) palloc(sizeof(ResponseCombinerData)); - if (combiner == NULL) - { - /* Out of memory */ - return combiner; - } - - combiner->node_count = node_count; - combiner->combine_type = combine_type; - combiner->dest = dest; - combiner->command_complete_count = 0; - combiner->row_count = 0; - combiner->request_type = REQUEST_TYPE_NOT_DEFINED; - combiner->description_count = 0; - combiner->copy_in_count = 0; - combiner->copy_out_count = 0; - combiner->inErrorState = false; - combiner->initAggregates = true; - combiner->simple_aggregates = NULL; - combiner->copy_file = NULL; - - return combiner; -} - -/* - * Parse out row count from the command status response and convert it to integer - */ -static int -parse_row_count(const char *message, size_t len, int *rowcount) -{ - int digits = 0; - int pos; - - *rowcount = 0; - /* skip \0 string terminator */ - for (pos = 0; pos < len - 1; pos++) - { - if (message[pos] >= '0' && message[pos] <= '9') - { - *rowcount = *rowcount * 10 + message[pos] - '0'; - digits++; - } - else - { - *rowcount = 0; - digits = 0; - } - } - return digits; -} - -/* - * Extract a transition value from data row. Invoke the Input Function - * associated with the transition data type to represent value as a Datum. - * Output parameters value and val_null, receive extracted value and indicate - * whether it is null. - */ -static void -parse_aggregate_value(SimpleAgg *simple_agg, char *col_data, size_t datalen, Datum *value, bool *val_null) -{ - /* Check NULL */ - if (datalen == -1) - { - *value = (Datum) 0; - *val_null = true; - } - else - { - resetStringInfo(&simple_agg->valuebuf); - appendBinaryStringInfo(&simple_agg->valuebuf, col_data, datalen); - *value = InputFunctionCall(&simple_agg->arginputfn, simple_agg->valuebuf.data, simple_agg->argioparam, -1); - *val_null = false; - } -} - -/* - * Initialize the collection value, when agregation is first set up, or for a - * new group (grouping support is not implemented yet) - */ -static void -initialize_collect_aggregates(SimpleAgg *simple_agg) -{ - if (simple_agg->initValueIsNull) - simple_agg->collectValue = simple_agg->initValue; - else - simple_agg->collectValue = datumCopy(simple_agg->initValue, - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - simple_agg->noCollectValue = simple_agg->initValueIsNull; - simple_agg->collectValueNull = simple_agg->initValueIsNull; -} - -/* - * Finalize the aggregate after current group or entire relation is processed - * (grouping support is not implemented yet) - */ -static void -finalize_collect_aggregates(SimpleAgg *simple_agg, Datum *resultVal, bool *resultIsNull) -{ - /* - * Apply the agg's finalfn if one is provided, else return collectValue. - */ - if (OidIsValid(simple_agg->finalfn_oid)) - { - FunctionCallInfoData fcinfo; - - InitFunctionCallInfoData(fcinfo, &(simple_agg->finalfn), 1, - (void *) simple_agg, NULL); - fcinfo.arg[0] = simple_agg->collectValue; - fcinfo.argnull[0] = simple_agg->collectValueNull; - if (fcinfo.flinfo->fn_strict && simple_agg->collectValueNull) - { - /* don't call a strict function with NULL inputs */ - *resultVal = (Datum) 0; - *resultIsNull = true; - } - else - { - *resultVal = FunctionCallInvoke(&fcinfo); - *resultIsNull = fcinfo.isnull; - } - } - else - { - *resultVal = simple_agg->collectValue; - *resultIsNull = simple_agg->collectValueNull; - } -} - -/* - * Given new input value(s), advance the transition function of an aggregate. - * - * The new values (and null flags) have been preloaded into argument positions - * 1 and up in fcinfo, so that we needn't copy them again to pass to the - * collection function. No other fields of fcinfo are assumed valid. - * - * It doesn't matter which memory context this is called in. - */ -static void -advance_collect_function(SimpleAgg *simple_agg, FunctionCallInfoData *fcinfo) -{ - Datum newVal; - - if (simple_agg->transfn.fn_strict) - { - /* - * For a strict transfn, nothing happens when there's a NULL input; we - * just keep the prior transValue. - */ - if (fcinfo->argnull[1]) - return; - if (simple_agg->noCollectValue) - { - /* - * result has not been initialized - * We must copy the datum into result if it is pass-by-ref. We - * do not need to pfree the old result, since it's NULL. - */ - simple_agg->collectValue = datumCopy(fcinfo->arg[1], - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - simple_agg->collectValueNull = false; - simple_agg->noCollectValue = false; - return; - } - if (simple_agg->collectValueNull) - { - /* - * Don't call a strict function with NULL inputs. Note it is - * possible to get here despite the above tests, if the transfn is - * strict *and* returned a NULL on a prior cycle. If that happens - * we will propagate the NULL all the way to the end. - */ - return; - } - } - - /* - * OK to call the transition function - */ - InitFunctionCallInfoData(*fcinfo, &(simple_agg->transfn), 2, (void *) simple_agg, NULL); - fcinfo->arg[0] = simple_agg->collectValue; - fcinfo->argnull[0] = simple_agg->collectValueNull; - newVal = FunctionCallInvoke(fcinfo); - - /* - * If pass-by-ref datatype, must copy the new value into aggcontext and - * pfree the prior transValue. But if transfn returned a pointer to its - * first input, we don't need to do anything. - */ - if (!simple_agg->transtypeByVal && - DatumGetPointer(newVal) != DatumGetPointer(simple_agg->collectValue)) - { - if (!fcinfo->isnull) - { - newVal = datumCopy(newVal, - simple_agg->transtypeByVal, - simple_agg->transtypeLen); - } - if (!simple_agg->collectValueNull) - pfree(DatumGetPointer(simple_agg->collectValue)); - } - - simple_agg->collectValue = newVal; - simple_agg->collectValueNull = fcinfo->isnull; -} - -/* - * Handle response message and update combiner's state. - * This function contains main combiner logic - */ -int -CombineResponse(ResponseCombiner combiner, char msg_type, char *msg_body, size_t len) -{ - int digits = 0; - - /* Ignore anything if we have encountered error */ - if (combiner->inErrorState) - return EOF; - - switch (msg_type) - { - case 'c': /* CopyOutCommandComplete */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - /* Just do nothing, close message is managed by the coordinator */ - combiner->copy_out_count++; - break; - case 'C': /* CommandComplete */ - /* - * If we did not receive description we are having rowcount or OK - * response - */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COMMAND; - /* Extract rowcount */ - if (combiner->combine_type != COMBINE_TYPE_NONE) - { - int rowcount; - digits = parse_row_count(msg_body, len, &rowcount); - if (digits > 0) - { - /* Replicated write, make sure they are the same */ - if (combiner->combine_type == COMBINE_TYPE_SAME) - { - if (combiner->command_complete_count) - { - if (rowcount != combiner->row_count) - /* There is a consistency issue in the database with the replicated table */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Write to replicated table returned different results from the data nodes"))); - } - else - /* first result */ - combiner->row_count = rowcount; - } - else - combiner->row_count += rowcount; - } - else - combiner->combine_type = COMBINE_TYPE_NONE; - } - if (++combiner->command_complete_count == combiner->node_count) - { - - if (combiner->simple_aggregates - /* - * Aggregates has not been initialized - that means - * no rows received from data nodes, nothing to send - * It is possible if HAVING clause is present - */ - && !combiner->initAggregates) - { - /* Build up and send a datarow with aggregates */ - StringInfo dataRowBuffer = makeStringInfo(); - ListCell *lc; - - /* Number of fields */ - pq_sendint(dataRowBuffer, list_length(combiner->simple_aggregates), 2); - - foreach (lc, combiner->simple_aggregates) - { - SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); - Datum resultVal; - bool resultIsNull; - - finalize_collect_aggregates(simple_agg, &resultVal, &resultIsNull); - /* Aggregation result */ - if (resultIsNull) - { - pq_sendint(dataRowBuffer, -1, 4); - } - else - { - char *text = OutputFunctionCall(&simple_agg->resoutputfn, resultVal); - size_t len = strlen(text); - pq_sendint(dataRowBuffer, len, 4); - pq_sendtext(dataRowBuffer, text, len); - } - } - pq_putmessage('D', dataRowBuffer->data, dataRowBuffer->len); - pfree(dataRowBuffer->data); - pfree(dataRowBuffer); - } - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - { - if (combiner->combine_type == COMBINE_TYPE_NONE) - { - pq_putmessage(msg_type, msg_body, len); - } - else - { - char command_complete_buffer[256]; - - /* Truncate msg_body to get base string */ - msg_body[len - digits - 1] = '\0'; - len = sprintf(command_complete_buffer, "%s%d", msg_body, combiner->row_count) + 1; - pq_putmessage(msg_type, command_complete_buffer, len); - } - } - } - break; - case 'T': /* RowDescription */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_QUERY; - if (combiner->request_type != REQUEST_TYPE_QUERY) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy first */ - if (combiner->description_count++ == 0) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'S': /* ParameterStatus (SET command) */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_QUERY; - if (combiner->request_type != REQUEST_TYPE_QUERY) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy last */ - if (++combiner->description_count == combiner->node_count) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'G': /* CopyInResponse */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_IN; - if (combiner->request_type != REQUEST_TYPE_COPY_IN) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* Proxy first */ - if (combiner->copy_in_count++ == 0) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - break; - case 'H': /* CopyOutResponse */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - { - /* Inconsistent responses */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - /* - * The normal PG code will output an H message when it runs in the - * coordinator, so do not proxy message here, just count it. - */ - combiner->copy_out_count++; - break; - case 'd': /* CopyOutDataRow */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - combiner->request_type = REQUEST_TYPE_COPY_OUT; - - /* Inconsistent responses */ - if (combiner->request_type != REQUEST_TYPE_COPY_OUT) - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - - /* If there is a copy file, data has to be sent to the local file */ - if (combiner->copy_file) - { - /* write data to the copy file */ - char *data_row; - data_row = (char *) palloc0(len); - memcpy(data_row, msg_body, len); - - fwrite(data_row, 1, len, combiner->copy_file); - break; - } - /* - * In this case data is sent back to the client - */ - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - { - StringInfo data_buffer; - - data_buffer = makeStringInfo(); - - pq_sendtext(data_buffer, msg_body, len); - pq_putmessage(msg_type, - data_buffer->data, - data_buffer->len); - - pfree(data_buffer->data); - pfree(data_buffer); - } - break; - case 'D': /* DataRow */ - if (!combiner->simple_aggregates) - { - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - } - else - { - ListCell *lc; - char **col_values; - int *col_value_len; - uint16 col_count; - int i, cur = 0; - - /* Get values from the data row into array to speed up access */ - memcpy(&col_count, msg_body, 2); - col_count = ntohs(col_count); - cur += 2; - - col_values = (char **) palloc0(col_count * sizeof(char *)); - col_value_len = (int *) palloc0(col_count * sizeof(int)); - for (i = 0; i < col_count; i++) - { - int n32; - - memcpy(&n32, msg_body + cur, 4); - col_value_len[i] = ntohl(n32); - cur += 4; - - if (col_value_len[i] != -1) - { - col_values[i] = msg_body + cur; - cur += col_value_len[i]; - } - } - - if (combiner->initAggregates) - { - foreach (lc, combiner->simple_aggregates) - initialize_collect_aggregates((SimpleAgg *) lfirst(lc)); - - combiner->initAggregates = false; - } - - foreach (lc, combiner->simple_aggregates) - { - SimpleAgg *simple_agg = (SimpleAgg *) lfirst(lc); - FunctionCallInfoData fcinfo; - - parse_aggregate_value(simple_agg, - col_values[simple_agg->column_pos], - col_value_len[simple_agg->column_pos], - fcinfo.arg + 1, - fcinfo.argnull + 1); - - advance_collect_function(simple_agg, &fcinfo); - } - pfree(col_values); - pfree(col_value_len); - } - break; - case 'E': /* ErrorResponse */ - combiner->inErrorState = true; - /* fallthru */ - case 'A': /* NotificationResponse */ - case 'N': /* NoticeResponse */ - /* Proxy error message back if specified, - * or if doing internal primary copy - */ - if (combiner->dest == DestRemote - || combiner->dest == DestRemoteExecute) - pq_putmessage(msg_type, msg_body, len); - break; - case 'I': /* EmptyQuery */ - default: - /* Unexpected message */ - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("Unexpected response from the data nodes"))); - } - return 0; -} - -/* - * Examine the specified combiner state and determine if command was completed - * successfully - */ -static bool -validate_combiner(ResponseCombiner combiner) -{ - /* There was error message while combining */ - if (combiner->inErrorState) - return false; - /* Check if state is defined */ - if (combiner->request_type == REQUEST_TYPE_NOT_DEFINED) - return false; - /* Check all nodes completed */ - if ((combiner->request_type == REQUEST_TYPE_COMMAND - || combiner->request_type == REQUEST_TYPE_QUERY) - && combiner->command_complete_count != combiner->node_count) - return false; - - /* Check count of description responses */ - if (combiner->request_type == REQUEST_TYPE_QUERY - && combiner->description_count != combiner->node_count) - return false; - - /* Check count of copy-in responses */ - if (combiner->request_type == REQUEST_TYPE_COPY_IN - && combiner->copy_in_count != combiner->node_count) - return false; - - /* Check count of copy-out responses */ - if (combiner->request_type == REQUEST_TYPE_COPY_OUT - && combiner->copy_out_count != combiner->node_count) - return false; - - /* Add other checks here as needed */ - - /* All is good if we are here */ - return true; -} - -/* - * Validate combiner and release storage freeing allocated memory - */ -bool -ValidateAndCloseCombiner(ResponseCombiner combiner) -{ - bool valid = validate_combiner(combiner); - - pfree(combiner); - - return valid; -} - -/* - * Validate combiner and reset storage - */ -bool -ValidateAndResetCombiner(ResponseCombiner combiner) -{ - bool valid = validate_combiner(combiner); - - combiner->command_complete_count = 0; - combiner->row_count = 0; - combiner->request_type = REQUEST_TYPE_NOT_DEFINED; - combiner->description_count = 0; - combiner->copy_in_count = 0; - combiner->copy_out_count = 0; - combiner->inErrorState = false; - combiner->simple_aggregates = NULL; - combiner->copy_file = NULL; - - return valid; -} - -/* - * Close combiner and free allocated memory, if it is not needed - */ -void -CloseCombiner(ResponseCombiner combiner) -{ - if (combiner) - pfree(combiner); -} - -/* - * Assign combiner aggregates - */ -void -AssignCombinerAggregates(ResponseCombiner combiner, List *simple_aggregates) -{ - combiner->simple_aggregates = simple_aggregates; -} diff --git a/src/backend/pgxc/pool/datanode.c b/src/backend/pgxc/pool/datanode.c index 6a1aba8..517b1e4 100644 --- a/src/backend/pgxc/pool/datanode.c +++ b/src/backend/pgxc/pool/datanode.c @@ -15,6 +15,7 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" #include <sys/select.h> #include <sys/time.h> #include <sys/types.h> @@ -22,166 +23,33 @@ #include <string.h> #include <unistd.h> #include <errno.h> -#include "pgxc/poolmgr.h" #include "access/gtm.h" #include "access/transam.h" #include "access/xact.h" -#include "postgres.h" -#include "utils/snapmgr.h" -#include "pgxc/pgxc.h" #include "gtm/gtm_c.h" #include "pgxc/datanode.h" #include "pgxc/locator.h" -#include "../interfaces/libpq/libpq-fe.h" +#include "pgxc/pgxc.h" +#include "pgxc/poolmgr.h" +#include "tcop/dest.h" #include "utils/elog.h" #include "utils/memutils.h" - +#include "utils/snapmgr.h" +#include "../interfaces/libpq/libpq-fe.h" #define NO_SOCKET -1 -/* - * Buffer size does not affect performance significantly, just do not allow - * connection buffer grows infinitely - */ -#define COPY_BUFFER_SIZE 8192 -#define PRIMARY_NODE_WRITEAHEAD 1024 * 1024 - static int node_count = 0; static DataNodeHandle *handles = NULL; -static bool autocommit = true; -static DataNodeHandle **write_node_list = NULL; -static int write_node_count = 0; -static DataNodeHandle **get_handles(List *nodelist); -static int get_transaction_nodes(DataNodeHandle **connections); -static void release_handles(void); - -static void data_node_init(DataNodeHandle *handle, int sock); +static void data_node_init(DataNodeHandle *handle, int sock, int nodenum); static void data_node_free(DataNodeHandle *handle); -static int data_node_begin(int conn_count, DataNodeHandle **connections, CommandDest dest, GlobalTransactionId gxid); -static int data_node_commit(int conn_count, DataNodeHandle **connections, CommandDest dest); -static int data_node_rollback(int conn_count, DataNodeHandle **connections, CommandDest dest); - -static int ensure_in_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle); -static int ensure_out_buffer_capacity(size_t bytes_needed, DataNodeHandle *handle); - -static int data_node_send_query(DataNodeHandle *handle, const char *query); -static int data_node_send_gxid(DataNodeHandle *handle, GlobalTransactionId gxid); -static int data_node_send_snapshot(DataNodeHandle *handle, Snapshot snapshot); - -static void add_error_message(DataNodeHandle *handle, const char *message); - -static int data_node_read_data(DataNodeHandle *conn); -static int handle_response(DataNodeHandle *conn, ResponseCombiner combiner); - -static int get_int(DataNodeHandle *conn, size_t len, int *out); -static int get_char(DataNodeHandle *conn, char *out); - -static void clear_write_node_list(); - -#define MAX_STATEMENTS_PER_TRAN 10 - -/* Variables to collect statistics */ -static int total_transactions = 0; -static int total_statements = 0; -static int total_autocommit = 0; -static int nonautocommit_2pc = 0; -static int autocommit_2pc = 0; -static int current_tran_statements = 0; -static int *statements_per_transaction = NULL; -static int *nodes_per_transaction = NULL; - -/* - * statistics collection: count a statement - */ -static void -stat_statement() -{ - total_statements++; - current_tran_statements++; -} - -/* - * To collect statistics: count a transaction - */ -static void -stat_transaction(int node_count) -{ - total_transactions++; - if (autocommit) - total_autocommit++; - - if (!statements_per_transaction) - { - statements_per_transaction = (int *) malloc((MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); - memset(statements_per_transaction, 0, (MAX_STATEMENTS_PER_TRAN + 1) * sizeof(int)); - } - - if (current_tran_statements > MAX_STATEMENTS_PER_TRAN) - statements_per_transaction[MAX_STATEMENTS_PER_TRAN]++; - else - statements_per_transaction[current_tran_statements]++; - - current_tran_statements = 0; - if (node_count > 0 && node_count <= NumDataNodes) - { - if (!nodes_per_transaction) - { - nodes_per_transaction = (int *) malloc(NumDataNodes * sizeof(int)); - memset(nodes_per_transaction, 0, NumDataNodes * sizeof(int)); - } - nodes_per_transaction[node_count - 1]++; - } -} - - -/* - * To collect statistics: count a two-phase commit on nodes - */ -static void -stat_2pc(void) -{ - if (autocommit) - autocommit_2pc++; - else - nonautocommit_2pc++; -} +static int get_int(DataNodeHandle * conn, size_t len, int *out); +static int get_char(DataNodeHandle * conn, char *out); /* - * Output collected statistics to the log - */ -static void -stat_log(void) -{ - elog(DEBUG1, "Total Transactions: %d Tota... [truncated message content] |