|
From: mason_s <ma...@us...> - 2010-08-23 04:14:09
|
Project "Postgres-XC".
The branch, master has been updated
via b6602543d5dd6dfa4005db41c73d0136f74af13e (commit)
from cfb29183b57e811e0dfcf3641c5cc58458b2584a (commit)
- Log -----------------------------------------------------------------
commit b6602543d5dd6dfa4005db41c73d0136f74af13e
Author: M S <masonsharp@5.105.180.203.e.iijmobile.jp>
Date: Mon Aug 23 13:11:31 2010 +0900
Initial support for multi-step queries, including cross-node joins.
Note that this is a "version 1.0" implementation, borrowing some code
from the SQL/MED patch. This means that all cross-node joins
take place on a Coordinator by pulling up data from the data nodes.
Some queries will therefore execute quite slowly, but they will
at least execute. In this patch, all columns are SELECTed from
the remote table, but at least simple WHERE clauses are pushed
down to the remote nodes. We will optimize query processing
in the future.
Note that the same connections to remote nodes are used in
multiple steps. To get around that problem, we just
add a materialization node above each RemoteQuery node,
and force all results to be fetched first on the
Coordinator.
This patch also allows UNION, EXCEPT and INTERSECT, and other
more complex SELECT statements to run now.
It includes a fix for single-step, multi-node LIMIT and OFFSET.
It also includes EXPLAIN output from the Coordinator's
point of view.
Adding these changes introduced a problem with AVG(),
which is currently not working.
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 950a2f1..aa92917 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -565,6 +565,11 @@ explain_outNode(StringInfo str,
case T_WorkTableScan:
pname = "WorkTable Scan";
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ pname = "Data Node Scan";
+ break;
+#endif
case T_Material:
pname = "Materialize";
break;
@@ -668,6 +673,9 @@ explain_outNode(StringInfo str,
case T_SeqScan:
case T_BitmapHeapScan:
case T_TidScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
if (((Scan *) plan)->scanrelid > 0)
{
RangeTblEntry *rte = rt_fetch(((Scan *) plan)->scanrelid,
@@ -686,6 +694,26 @@ explain_outNode(StringInfo str,
appendStringInfo(str, " %s",
quote_identifier(rte->eref->aliasname));
}
+#ifdef PGXC
+ if (IsA(plan, RemoteQuery))
+ {
+ RemoteQuery *remote_query = (RemoteQuery *) plan;
+
+ /* if it is a single-step plan, print out the sql being used */
+ if (remote_query->sql_statement)
+ {
+ char *realsql = NULL;
+ realsql = strcasestr(remote_query->sql_statement, "explain");
+ if (!realsql)
+ realsql = remote_query->sql_statement;
+ else
+ realsql += 8; /* skip "EXPLAIN" */
+
+ appendStringInfo(str, " %s",
+ quote_identifier(realsql));
+ }
+ }
+#endif
break;
case T_BitmapIndexScan:
appendStringInfo(str, " on %s",
@@ -854,6 +882,9 @@ explain_outNode(StringInfo str,
case T_ValuesScan:
case T_CteScan:
case T_WorkTableScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
show_scan_qual(plan->qual,
"Filter",
((Scan *) plan)->scanrelid,
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 22fd416..c8b1456 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2925,6 +2925,20 @@ ATRewriteTables(List **wqueue)
}
}
+#ifdef PGXC
+ /*
+ * In PGXC, do not check the FK constraints on the Coordinator, and just return
+ * That is because a SELECT is generated whose plan will try and use
+ * the data nodes. We (currently) do not want to do that on the Coordinator,
+ * when the command is passed down to the data nodes it will
+ * peform the check locally.
+ * This issue was introduced when we added multi-step handling,
+ * it caused foreign key constraints to fail.
+ * PGXCTODO - issue for pg_catalog or any other cases?
+ */
+ if (IS_PGXC_COORDINATOR)
+ return;
+#endif
/*
* Foreign key constraints are checked in a final pass, since (a) it's
* generally best to examine each one separately, and (b) it's at least
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 25f350f..01e2548 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -44,6 +44,9 @@
#include "executor/nodeWindowAgg.h"
#include "executor/nodeWorktablescan.h"
#include "nodes/nodeFuncs.h"
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
#include "utils/syscache.h"
@@ -183,6 +186,11 @@ ExecReScan(PlanState *node, ExprContext *exprCtxt)
ExecWorkTableScanReScan((WorkTableScanState *) node, exprCtxt);
break;
+#ifdef PGXC
+ case T_RemoteQueryState:
+ ExecRemoteQueryReScan((RemoteQueryState *) node, exprCtxt);
+ break;
+#endif
case T_NestLoopState:
ExecReScanNestLoop((NestLoopState *) node, exprCtxt);
break;
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 446b400..2cd3298 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -24,6 +24,9 @@
#include "executor/executor.h"
#include "executor/nodeMaterial.h"
#include "miscadmin.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
/* ----------------------------------------------------------------
* ExecMaterial
@@ -56,9 +59,24 @@ ExecMaterial(MaterialState *node)
/*
* If first time through, and we need a tuplestore, initialize it.
*/
+#ifdef PGXC
+ /*
+ * For PGXC, temporarily always create the storage.
+ * This allows us to easily use the same connection to
+ * in multiple steps of the plan.
+ */
+ if ((IS_PGXC_COORDINATOR && tuplestorestate == NULL)
+ || (IS_PGXC_DATANODE && tuplestorestate == NULL && node->eflags != 0))
+#else
if (tuplestorestate == NULL && node->eflags != 0)
+#endif
{
tuplestorestate = tuplestore_begin_heap(true, false, work_mem);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ /* Note that we will rescan these results */
+ node->eflags |= EXEC_FLAG_REWIND;
+#endif
tuplestore_set_eflags(tuplestorestate, node->eflags);
if (node->eflags & EXEC_FLAG_MARK)
{
@@ -73,6 +91,26 @@ ExecMaterial(MaterialState *node)
Assert(ptrno == 1);
}
node->tuplestorestate = tuplestorestate;
+
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ TupleTableSlot *outerslot;
+ PlanState *outerNode = outerPlanState(node);
+
+ /* We want to always materialize first temporarily in PG-XC */
+ while (!node->eof_underlying)
+ {
+ outerslot = ExecProcNode(outerNode);
+ if (TupIsNull(outerslot))
+ node->eof_underlying = true;
+ else
+ /* Append a copy of the returned tuple to tuplestore. */
+ tuplestore_puttupleslot(tuplestorestate, outerslot);
+ }
+ tuplestore_rescan(node->tuplestorestate);
+ }
+#endif
}
/*
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index a1aa660..0e9aa43 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -17,6 +17,7 @@
#include <math.h>
+#include "catalog/pg_namespace.h"
#include "nodes/nodeFuncs.h"
#ifdef OPTIMIZER_DEBUG
#include "nodes/print.h"
@@ -33,7 +34,11 @@
#include "optimizer/var.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
#include "rewrite/rewriteManip.h"
+#include "utils/lsyscache.h"
/* These parameters are set by GUC */
@@ -254,6 +259,18 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
* least one dimension of cost or sortedness.
*/
+#ifdef PGXC
+ /*
+ * If we are on the coordinator, we always want to use
+ * the remote query path unless it is a pg_catalog table.
+ */
+ if (IS_PGXC_COORDINATOR
+ && get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE)
+ add_path(rel, create_remotequery_path(root, rel));
+ else
+ {
+#endif
+
/* Consider sequential scan */
add_path(rel, create_seqscan_path(root, rel));
@@ -262,6 +279,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
/* Consider TID scans */
create_tidscan_paths(root, rel);
+#ifdef PGXC
+ }
+#endif
/* Now find the cheapest of the paths for this rel */
set_cheapest(rel);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 6e5c251..337f17b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -32,6 +32,9 @@
#include "optimizer/var.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
+#ifdef PGXC
+#include "pgxc/planner.h"
+#endif
#include "utils/lsyscache.h"
@@ -66,6 +69,10 @@ static CteScan *create_ctescan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
static WorkTableScan *create_worktablescan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
+#ifdef PGXC
+static RemoteQuery *create_remotequery_plan(PlannerInfo *root, Path *best_path,
+ List *tlist, List *scan_clauses);
+#endif
static NestLoop *create_nestloop_plan(PlannerInfo *root, NestPath *best_path,
Plan *outer_plan, Plan *inner_plan);
static MergeJoin *create_mergejoin_plan(PlannerInfo *root, MergePath *best_path,
@@ -101,6 +108,10 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual,
Index scanrelid, int ctePlanId, int cteParam);
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam);
+#ifdef PGXC
+static RemoteQuery *make_remotequery(List *qptlist, RangeTblEntry *rte,
+ List *qpqual, Index scanrelid);
+#endif
static BitmapAnd *make_bitmap_and(List *bitmapplans);
static BitmapOr *make_bitmap_or(List *bitmapplans);
static NestLoop *make_nestloop(List *tlist,
@@ -162,6 +173,9 @@ create_plan(PlannerInfo *root, Path *best_path)
case T_ValuesScan:
case T_CteScan:
case T_WorkTableScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
plan = create_scan_plan(root, best_path);
break;
case T_HashJoin:
@@ -207,6 +221,9 @@ create_scan_plan(PlannerInfo *root, Path *best_path)
List *tlist;
List *scan_clauses;
Plan *plan;
+#ifdef PGXC
+ Plan *matplan;
+#endif
/*
* For table scans, rather than using the relation targetlist (which is
@@ -298,6 +315,23 @@ create_scan_plan(PlannerInfo *root, Path *best_path)
scan_clauses);
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ plan = (Plan *) create_remotequery_plan(root,
+ best_path,
+ tlist,
+ scan_clauses);
+
+ /*
+ * Insert a materialization plan above this temporarily
+ * until we better handle multiple steps using the same connection.
+ */
+ matplan = (Plan *) make_material(plan);
+ copy_plan_costsize(matplan, plan);
+ matplan->total_cost += cpu_tuple_cost * matplan->plan_rows;
+ plan = matplan;
+ break;
+#endif
default:
elog(ERROR, "unrecognized node type: %d",
(int) best_path->pathtype);
@@ -420,6 +454,9 @@ disuse_physical_tlist(Plan *plan, Path *path)
case T_ValuesScan:
case T_CteScan:
case T_WorkTableScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
plan->targetlist = build_relation_tlist(path->parent);
break;
default:
@@ -1544,6 +1581,46 @@ create_worktablescan_plan(PlannerInfo *root, Path *best_path,
return scan_plan;
}
+#ifdef PGXC
+/*
+ * create_remotequery_plan
+ * Returns a remotequery plan for the base relation scanned by 'best_path'
+ * with restriction clauses 'scan_clauses' and targetlist 'tlist'.
+ */
+static RemoteQuery *
+create_remotequery_plan(PlannerInfo *root, Path *best_path,
+ List *tlist, List *scan_clauses)
+{
+ RemoteQuery *scan_plan;
+ Index scan_relid = best_path->parent->relid;
+ RangeTblEntry *rte;
+
+
+ Assert(scan_relid > 0);
+ rte = planner_rt_fetch(scan_relid, root);
+ Assert(best_path->parent->rtekind == RTE_RELATION);
+ Assert(rte->rtekind == RTE_RELATION);
+
+ /* Sort clauses into best execution order */
+ scan_clauses = order_qual_clauses(root, scan_clauses);
+
+ /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
+ scan_clauses = extract_actual_clauses(scan_clauses, false);
+
+ scan_plan = make_remotequery(tlist,
+ rte,
+ scan_clauses,
+ scan_relid);
+
+ copy_path_costsize(&scan_plan->scan.plan, best_path);
+
+ /* PGXCTODO - get better estimates */
+ scan_plan->scan.plan.plan_rows = 1000;
+
+ return scan_plan;
+}
+#endif
+
/*****************************************************************************
*
@@ -2615,6 +2692,28 @@ make_worktablescan(List *qptlist,
return node;
}
+#ifdef PGXC
+static RemoteQuery *
+make_remotequery(List *qptlist,
+ RangeTblEntry *rte,
+ List *qpqual,
+ Index scanrelid)
+{
+ RemoteQuery *node = makeNode(RemoteQuery);
+ Plan *plan = &node->scan.plan;
+
+ /* cost should be inserted by caller */
+ plan->targetlist = qptlist;
+ plan->qual = qpqual;
+ plan->lefttree = NULL;
+ plan->righttree = NULL;
+ node->scan.scanrelid = scanrelid;
+ node->read_only = true;
+
+ return node;
+}
+#endif
+
Append *
make_append(List *appendplans, bool isTarget, List *tlist)
{
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index d839c28..cab7fb4 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -22,6 +22,9 @@
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
#include "optimizer/tlist.h"
+#ifdef PGXC
+#include "pgxc/planner.h"
+#endif
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@@ -373,6 +376,19 @@ set_plan_refs(PlannerGlobal *glob, Plan *plan, int rtoffset)
fix_scan_list(glob, splan->scan.plan.qual, rtoffset);
}
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ {
+ RemoteQuery *splan = (RemoteQuery *) plan;
+
+ splan->scan.scanrelid += rtoffset;
+ splan->scan.plan.targetlist =
+ fix_scan_list(glob, splan->scan.plan.targetlist, rtoffset);
+ splan->scan.plan.qual =
+ fix_scan_list(glob, splan->scan.plan.qual, rtoffset);
+ }
+ break;
+#endif
case T_NestLoop:
case T_MergeJoin:
case T_HashJoin:
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index bde351d..e5c6dac 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -1963,6 +1963,12 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params)
bms_add_member(context.paramids,
((WorkTableScan *) plan)->wtParam);
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ //PGXCTODO
+ context.paramids = bms_add_members(context.paramids, valid_params);
+ break;
+#endif
case T_Append:
{
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index b7c3d3c..cbf7618 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1310,6 +1310,28 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel)
return pathnode;
}
+#ifdef PGXC
+/*
+ * create_remotequery_path
+ * Creates a path corresponding to a scan of a remote query,
+ * returning the pathnode.
+ */
+Path *
+create_remotequery_path(PlannerInfo *root, RelOptInfo *rel)
+{
+ Path *pathnode = makeNode(Path);
+
+ pathnode->pathtype = T_RemoteQuery;
+ pathnode->parent = rel;
+ pathnode->pathkeys = NIL; /* result is always unordered */
+
+ // PGXCTODO - set cost properly
+ cost_seqscan(pathnode, root, rel);
+
+ return pathnode;
+}
+#endif
+
/*
* create_nestloop_path
* Creates a pathnode corresponding to a nestloop join between two
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 1dcfc29..c8911b7 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -25,6 +25,7 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "optimizer/clauses.h"
+#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
@@ -141,7 +142,7 @@ bool StrictSelectChecking = false;
static Exec_Nodes *get_plan_nodes(Query *query, bool isRead);
static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context);
static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context);
-
+static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt);
/*
* True if both lists contain only one node and are the same
@@ -1528,16 +1529,6 @@ get_simple_aggregates(Query * query)
simple_agg_list = lappend(simple_agg_list, simple_agg);
}
- else
- {
- /*
- * PGXCTODO relax this limit after adding GROUP BY support
- * then support expressions of aggregates
- */
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Query is not yet supported"))));
- }
column_pos++;
}
}
@@ -1629,7 +1620,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
{
List *context;
bool useprefix;
- List *sub_tlist = step->plan.targetlist;
+ List *sub_tlist = step->scan.plan.targetlist;
ListCell *l;
StringInfo buf = makeStringInfo();
char *sql;
@@ -1737,7 +1728,7 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
{
List *sortcls = query->sortClause;
List *distinctcls = query->distinctClause;
- List *sub_tlist = step->plan.targetlist;
+ List *sub_tlist = step->scan.plan.targetlist;
SimpleSort *sort;
SimpleDistinct *distinct;
ListCell *l;
@@ -1978,6 +1969,100 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
}
/*
+ * Special case optimization.
+ * Handle LIMIT and OFFSET for single-step queries on multiple nodes.
+ *
+ * Return non-zero if we need to fall back to the standard plan.
+ */
+static int
+handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt)
+{
+
+ /* check if no special handling needed */
+ if (query_step && query_step->exec_nodes &&
+ list_length(query_step->exec_nodes->nodelist) <= 1)
+ return 0;
+
+ /* if order by and limit are present, do not optimize yet */
+ if ((query->limitCount || query->limitOffset) && query->sortClause)
+ return 1;
+
+ /*
+ * Note that query_step->is_single_step is set to true, but
+ * it is ok even if we add limit here.
+ * If OFFSET is set, we strip the final offset value and add
+ * it to the LIMIT passed down. If there is an OFFSET and no
+ * LIMIT, we just strip off OFFSET.
+ */
+ if (query->limitOffset)
+ {
+ int64 newLimit = 0;
+ char *newpos;
+ char *pos;
+ char *limitpos;
+ char *newQuery;
+ char *newchar;
+ char *c;
+
+ pos = NULL;
+ newpos = NULL;
+
+ if (query->limitCount)
+ {
+ for (pos = query_step->sql_statement, newpos = pos; newpos != NULL; )
+ {
+ pos = newpos;
+ newpos = strcasestr(pos+1, "LIMIT");
+ }
+ limitpos = pos;
+
+ if (IsA(query->limitCount, Const))
+ newLimit = DatumGetInt64(((Const *) query->limitCount)->constvalue);
+ else
+ return 1;
+ }
+
+ for (pos = query_step->sql_statement, newpos = pos; newpos != NULL; )
+ {
+ pos = newpos;
+ newpos = strcasestr(pos+1, "OFFSET");
+ }
+
+ if (limitpos && limitpos < pos)
+ pos = limitpos;
+
+ if (IsA(query->limitOffset, Const))
+ newLimit += DatumGetInt64(((Const *) query->limitOffset)->constvalue);
+ else
+ return 1;
+
+ if (!pos || pos == query_step->sql_statement)
+ elog(ERROR, "Could not handle LIMIT/OFFSET");
+
+ newQuery = (char *) palloc(strlen(query_step->sql_statement)+1);
+ newchar = newQuery;
+
+ /* copy up until position where we found clause */
+ for (c = &query_step->sql_statement[0]; c != pos && *c != '\0'; *newchar++ = *c++);
+
+ if (query->limitCount)
+ sprintf(newchar, "LIMIT %I64d", newLimit);
+ else
+ *newchar = '\0';
+
+ pfree(query_step->sql_statement);
+ query_step->sql_statement = newQuery;
+ }
+
+ /* Now add a limit execution node at the top of the plan */
+ plan_stmt->planTree = (Plan *) make_limit(plan_stmt->planTree,
+ query->limitOffset, query->limitCount, 0, 0);
+
+ return 0;
+}
+
+
+/*
* Build up a QueryPlan to execute on.
*
* For the prototype, there will only be one step,
@@ -1997,6 +2082,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
Plan *standardPlan = result->planTree;
RemoteQuery *query_step = makeNode(RemoteQuery);
+ query_step->is_single_step = false;
query_step->sql_statement = pstrdup(query->sql_statement);
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
@@ -2020,21 +2106,6 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("INTO clause not yet supported"))));
-
- if (query->setOperations)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
-
- if (query->hasRecursive)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("WITH RECURSIVE not yet supported"))));
-
- if (query->hasWindowFuncs)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Window functions not yet supported"))));
/* fallthru */
case T_InsertStmt:
case T_UpdateStmt:
@@ -2043,14 +2114,32 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query_step->exec_nodes == NULL)
{
+ /* Do not yet allow multi-node correlated UPDATE or DELETE */
+ if ((query->nodeTag == T_UpdateStmt || query->nodeTag == T_DeleteStmt))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("Complex and correlated UPDATE and DELETE not yet supported"))));
+ }
+
/*
- * Processing guery against catalog tables, restore
- * standard plan
+ * Processing guery against catalog tables, or multi-step command.
+ * Restore standard plan
*/
result->planTree = standardPlan;
return result;
}
+ /* Do not yet allow multi-node correlated UPDATE or DELETE */
+ if ((query->nodeTag == T_UpdateStmt || query->nodeTag == T_DeleteStmt)
+ && !query_step->exec_nodes
+ && list_length(query->rtable) > 1)
+ {
+ result->planTree = standardPlan;
+ return result;
+ }
+
+ query_step->is_single_step = true;
/*
* PGXCTODO
* When Postgres runs insert into t (a) values (1); against table
@@ -2064,7 +2153,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
* then call standard planner and take targetList from the plan
* generated by Postgres.
*/
- query_step->plan.targetlist = standardPlan->targetlist;
+ query_step->scan.plan.targetlist = standardPlan->targetlist;
if (query_step->exec_nodes)
query_step->combine_type = get_plan_combine_type(
@@ -2075,39 +2164,36 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
query_step->simple_aggregates = get_simple_aggregates(query);
/*
- * Add sortring to the step
+ * Add sorting to the step
*/
if (list_length(query_step->exec_nodes->nodelist) > 1 &&
(query->sortClause || query->distinctClause))
make_simple_sort_from_sortclauses(query, query_step);
- /*
- * PG-XC cannot yet support some variations of SQL statements.
- * We perform some checks to at least catch common cases
- */
+ /* Handle LIMIT and OFFSET for single-step queries on multiple nodes*/
+ if (handle_limit_offset(query_step, query, result))
+ {
+ /* complicated expressions, just fallback to standard plan */
+ result->planTree = standardPlan;
+ return result;
+ }
+ /*
+ * Use standard plan if we have more than one data node with either
+ * group by, hasWindowFuncs, or hasRecursive
+ */
/*
- * Check if we have multiple nodes and an unsupported clause. This
- * is temporary until we expand supported SQL
+ * PGXCTODO - this could be improved to check if the first
+ * group by expression is the partitioning column, in which
+ * case it is ok to treat as a single step.
*/
- if (query->nodeTag == T_SelectStmt)
+ if (query->nodeTag == T_SelectStmt
+ && query_step->exec_nodes
+ && list_length(query_step->exec_nodes->nodelist) > 1
+ && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
{
- if (StrictStatementChecking && query_step->exec_nodes
- && list_length(query_step->exec_nodes->nodelist) > 1)
- {
- /*
- * PGXCTODO - this could be improved to check if the first
- * group by expression is the partitioning column
- */
- if (query->groupClause)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Multi-node GROUP BY not yet supported"))));
- if (query->limitCount && StrictSelectChecking)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Multi-node LIMIT not yet supported"))));
- }
+ result->planTree = standardPlan;
+ return result;
}
break;
default:
diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile
index e875303..c7e950a 100644
--- a/src/backend/pgxc/pool/Makefile
+++ b/src/backend/pgxc/pool/Makefile
@@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o
+OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 0f16c51..43569e0 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -30,6 +30,8 @@
#include "utils/tuplesort.h"
#include "utils/snapmgr.h"
+extern char *deparseSql(RemoteQueryState *scanstate);
+
/*
* Buffer size does not affect performance significantly, just do not allow
* connection buffer grows infinitely
@@ -1461,8 +1463,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
{
if (need_tran)
DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE);
- else
- if (!PersistentConnections) release_handles();
+ else if (!PersistentConnections)
+ release_handles();
}
pfree(connections);
@@ -1812,21 +1814,44 @@ ExecCountSlotsRemoteQuery(RemoteQuery *node)
RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
- RemoteQueryState *remotestate;
+ RemoteQueryState *remotestate;
+ Relation currentRelation;
+
remotestate = CreateResponseCombiner(0, node->combine_type);
remotestate->ss.ps.plan = (Plan *) node;
remotestate->ss.ps.state = estate;
remotestate->simple_aggregates = node->simple_aggregates;
+ remotestate->ss.ps.qual = (List *)
+ ExecInitExpr((Expr *) node->scan.plan.qual,
+ (PlanState *) remotestate);
+
ExecInitResultTupleSlot(estate, &remotestate->ss.ps);
- if (node->plan.targetlist)
+ if (node->scan.plan.targetlist)
{
- TupleDesc typeInfo = ExecCleanTypeFromTL(node->plan.targetlist, false);
+ TupleDesc typeInfo = ExecCleanTypeFromTL(node->scan.plan.targetlist, false);
ExecSetSlotDescriptor(remotestate->ss.ps.ps_ResultTupleSlot, typeInfo);
}
ExecInitScanTupleSlot(estate, &remotestate->ss);
+
+ /*
+ * Initialize scan relation. get the relation object id from the
+ * relid'th entry in the range table, open that relation and acquire
+ * appropriate lock on it.
+ * This is needed for deparseSQL
+ * We should remove these lines once we plan and deparse earlier.
+ */
+ if (!node->is_single_step)
+ {
+ currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
+ remotestate->ss.ss_currentRelation = currentRelation;
+ ExecAssignScanType(&remotestate->ss, RelationGetDescr(currentRelation));
+ }
+
+ remotestate->ss.ps.ps_TupFromTlist = false;
+
/*
* Tuple description for the scan slot will be set on runtime from
* a RowDescription message
@@ -1991,7 +2016,6 @@ TupleTableSlot *
ExecRemoteQuery(RemoteQueryState *node)
{
RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
- EState *estate = node->ss.ps.state;
TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
bool have_tuple = false;
@@ -2092,6 +2116,11 @@ ExecRemoteQuery(RemoteQueryState *node)
data_node_begin(new_count, new_connections, gxid);
}
+ /* Get the SQL string */
+ /* only do if not single step */
+ if (!step->is_single_step)
+ step->sql_statement = deparseSql(node);
+
/* See if we have a primary nodes, execute on it first before the others */
if (primaryconnection)
{
@@ -2427,12 +2456,35 @@ ExecEndRemoteQuery(RemoteQueryState *node)
if (outerPlanState(node))
ExecEndNode(outerPlanState(node));
+ if (node->ss.ss_currentRelation)
+ ExecCloseScanRelation(node->ss.ss_currentRelation);
+
if (node->tmp_ctx)
MemoryContextDelete(node->tmp_ctx);
CloseCombiner(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecRemoteQueryReScan
+ *
+ * Rescans the relation.
+ * ----------------------------------------------------------------
+ */
+void
+ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt)
+{
+ /* At the moment we materialize results for multi-step queries,
+ * so no need to support rescan.
+ // PGXCTODO - rerun Init?
+ //node->routine->ReOpen(node);
+
+ //ExecScanReScan((ScanState *) node);
+ */
+}
+
+
/*
* Execute utility statement on multiple data nodes
* It does approximately the same as
diff --git a/src/backend/pgxc/pool/postgresql_fdw.c b/src/backend/pgxc/pool/postgresql_fdw.c
new file mode 100644
index 0000000..9e418be
--- /dev/null
+++ b/src/backend/pgxc/pool/postgresql_fdw.c
@@ -0,0 +1,335 @@
+/*-------------------------------------------------------------------------
+ *
+ * postgresql_fdw.c
+ * foreign-data wrapper for PostgreSQ...
[truncated message content] |