|
From: mason_s <ma...@us...> - 2010-08-23 04:14:09
|
Project "Postgres-XC".
The branch, master has been updated
via b6602543d5dd6dfa4005db41c73d0136f74af13e (commit)
from cfb29183b57e811e0dfcf3641c5cc58458b2584a (commit)
- Log -----------------------------------------------------------------
commit b6602543d5dd6dfa4005db41c73d0136f74af13e
Author: M S <masonsharp@5.105.180.203.e.iijmobile.jp>
Date: Mon Aug 23 13:11:31 2010 +0900
Initial support for multi-step queries, including cross-node joins.
Note that this is a "version 1.0" implementation, borrowing some code
from the SQL/MED patch. This means that all cross-node joins
take place on a Coordinator by pulling up data from the data nodes.
Some queries will therefore execute quite slowly, but they will
at least execute. In this patch, all columns are SELECTed from
the remote table, but at least simple WHERE clauses are pushed
down to the remote nodes. We will optimize query processing
in the future.
Note that the same connections to remote nodes are used in
multiple steps. To get around that problem, we just
add a materialization node above each RemoteQuery node,
and force all results to be fetched first on the
Coordinator.
This patch also allows UNION, EXCEPT and INTERSECT, and other
more complex SELECT statements to run now.
It includes a fix for single-step, multi-node LIMIT and OFFSET.
It also includes EXPLAIN output from the Coordinator's
point of view.
Adding these changes introduced a problem with AVG(),
which is currently not working.
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 950a2f1..aa92917 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -565,6 +565,11 @@ explain_outNode(StringInfo str,
case T_WorkTableScan:
pname = "WorkTable Scan";
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ pname = "Data Node Scan";
+ break;
+#endif
case T_Material:
pname = "Materialize";
break;
@@ -668,6 +673,9 @@ explain_outNode(StringInfo str,
case T_SeqScan:
case T_BitmapHeapScan:
case T_TidScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
if (((Scan *) plan)->scanrelid > 0)
{
RangeTblEntry *rte = rt_fetch(((Scan *) plan)->scanrelid,
@@ -686,6 +694,26 @@ explain_outNode(StringInfo str,
appendStringInfo(str, " %s",
quote_identifier(rte->eref->aliasname));
}
+#ifdef PGXC
+ if (IsA(plan, RemoteQuery))
+ {
+ RemoteQuery *remote_query = (RemoteQuery *) plan;
+
+ /* if it is a single-step plan, print out the sql being used */
+ if (remote_query->sql_statement)
+ {
+ char *realsql = NULL;
+ realsql = strcasestr(remote_query->sql_statement, "explain");
+ if (!realsql)
+ realsql = remote_query->sql_statement;
+ else
+ realsql += 8; /* skip "EXPLAIN" */
+
+ appendStringInfo(str, " %s",
+ quote_identifier(realsql));
+ }
+ }
+#endif
break;
case T_BitmapIndexScan:
appendStringInfo(str, " on %s",
@@ -854,6 +882,9 @@ explain_outNode(StringInfo str,
case T_ValuesScan:
case T_CteScan:
case T_WorkTableScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
show_scan_qual(plan->qual,
"Filter",
((Scan *) plan)->scanrelid,
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 22fd416..c8b1456 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2925,6 +2925,20 @@ ATRewriteTables(List **wqueue)
}
}
+#ifdef PGXC
+ /*
+ * In PGXC, do not check the FK constraints on the Coordinator, and just return
+ * That is because a SELECT is generated whose plan will try and use
+ * the data nodes. We (currently) do not want to do that on the Coordinator,
+ * when the command is passed down to the data nodes it will
+ * peform the check locally.
+ * This issue was introduced when we added multi-step handling,
+ * it caused foreign key constraints to fail.
+ * PGXCTODO - issue for pg_catalog or any other cases?
+ */
+ if (IS_PGXC_COORDINATOR)
+ return;
+#endif
/*
* Foreign key constraints are checked in a final pass, since (a) it's
* generally best to examine each one separately, and (b) it's at least
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 25f350f..01e2548 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -44,6 +44,9 @@
#include "executor/nodeWindowAgg.h"
#include "executor/nodeWorktablescan.h"
#include "nodes/nodeFuncs.h"
+#ifdef PGXC
+#include "pgxc/execRemote.h"
+#endif
#include "utils/syscache.h"
@@ -183,6 +186,11 @@ ExecReScan(PlanState *node, ExprContext *exprCtxt)
ExecWorkTableScanReScan((WorkTableScanState *) node, exprCtxt);
break;
+#ifdef PGXC
+ case T_RemoteQueryState:
+ ExecRemoteQueryReScan((RemoteQueryState *) node, exprCtxt);
+ break;
+#endif
case T_NestLoopState:
ExecReScanNestLoop((NestLoopState *) node, exprCtxt);
break;
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 446b400..2cd3298 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -24,6 +24,9 @@
#include "executor/executor.h"
#include "executor/nodeMaterial.h"
#include "miscadmin.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
/* ----------------------------------------------------------------
* ExecMaterial
@@ -56,9 +59,24 @@ ExecMaterial(MaterialState *node)
/*
* If first time through, and we need a tuplestore, initialize it.
*/
+#ifdef PGXC
+ /*
+ * For PGXC, temporarily always create the storage.
+ * This allows us to easily use the same connection to
+ * in multiple steps of the plan.
+ */
+ if ((IS_PGXC_COORDINATOR && tuplestorestate == NULL)
+ || (IS_PGXC_DATANODE && tuplestorestate == NULL && node->eflags != 0))
+#else
if (tuplestorestate == NULL && node->eflags != 0)
+#endif
{
tuplestorestate = tuplestore_begin_heap(true, false, work_mem);
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ /* Note that we will rescan these results */
+ node->eflags |= EXEC_FLAG_REWIND;
+#endif
tuplestore_set_eflags(tuplestorestate, node->eflags);
if (node->eflags & EXEC_FLAG_MARK)
{
@@ -73,6 +91,26 @@ ExecMaterial(MaterialState *node)
Assert(ptrno == 1);
}
node->tuplestorestate = tuplestorestate;
+
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR)
+ {
+ TupleTableSlot *outerslot;
+ PlanState *outerNode = outerPlanState(node);
+
+ /* We want to always materialize first temporarily in PG-XC */
+ while (!node->eof_underlying)
+ {
+ outerslot = ExecProcNode(outerNode);
+ if (TupIsNull(outerslot))
+ node->eof_underlying = true;
+ else
+ /* Append a copy of the returned tuple to tuplestore. */
+ tuplestore_puttupleslot(tuplestorestate, outerslot);
+ }
+ tuplestore_rescan(node->tuplestorestate);
+ }
+#endif
}
/*
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index a1aa660..0e9aa43 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -17,6 +17,7 @@
#include <math.h>
+#include "catalog/pg_namespace.h"
#include "nodes/nodeFuncs.h"
#ifdef OPTIMIZER_DEBUG
#include "nodes/print.h"
@@ -33,7 +34,11 @@
#include "optimizer/var.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
+#ifdef PGXC
+#include "pgxc/pgxc.h"
+#endif
#include "rewrite/rewriteManip.h"
+#include "utils/lsyscache.h"
/* These parameters are set by GUC */
@@ -254,6 +259,18 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
* least one dimension of cost or sortedness.
*/
+#ifdef PGXC
+ /*
+ * If we are on the coordinator, we always want to use
+ * the remote query path unless it is a pg_catalog table.
+ */
+ if (IS_PGXC_COORDINATOR
+ && get_rel_namespace(rte->relid) != PG_CATALOG_NAMESPACE)
+ add_path(rel, create_remotequery_path(root, rel));
+ else
+ {
+#endif
+
/* Consider sequential scan */
add_path(rel, create_seqscan_path(root, rel));
@@ -262,6 +279,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
/* Consider TID scans */
create_tidscan_paths(root, rel);
+#ifdef PGXC
+ }
+#endif
/* Now find the cheapest of the paths for this rel */
set_cheapest(rel);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 6e5c251..337f17b 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -32,6 +32,9 @@
#include "optimizer/var.h"
#include "parser/parse_clause.h"
#include "parser/parsetree.h"
+#ifdef PGXC
+#include "pgxc/planner.h"
+#endif
#include "utils/lsyscache.h"
@@ -66,6 +69,10 @@ static CteScan *create_ctescan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
static WorkTableScan *create_worktablescan_plan(PlannerInfo *root, Path *best_path,
List *tlist, List *scan_clauses);
+#ifdef PGXC
+static RemoteQuery *create_remotequery_plan(PlannerInfo *root, Path *best_path,
+ List *tlist, List *scan_clauses);
+#endif
static NestLoop *create_nestloop_plan(PlannerInfo *root, NestPath *best_path,
Plan *outer_plan, Plan *inner_plan);
static MergeJoin *create_mergejoin_plan(PlannerInfo *root, MergePath *best_path,
@@ -101,6 +108,10 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual,
Index scanrelid, int ctePlanId, int cteParam);
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam);
+#ifdef PGXC
+static RemoteQuery *make_remotequery(List *qptlist, RangeTblEntry *rte,
+ List *qpqual, Index scanrelid);
+#endif
static BitmapAnd *make_bitmap_and(List *bitmapplans);
static BitmapOr *make_bitmap_or(List *bitmapplans);
static NestLoop *make_nestloop(List *tlist,
@@ -162,6 +173,9 @@ create_plan(PlannerInfo *root, Path *best_path)
case T_ValuesScan:
case T_CteScan:
case T_WorkTableScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
plan = create_scan_plan(root, best_path);
break;
case T_HashJoin:
@@ -207,6 +221,9 @@ create_scan_plan(PlannerInfo *root, Path *best_path)
List *tlist;
List *scan_clauses;
Plan *plan;
+#ifdef PGXC
+ Plan *matplan;
+#endif
/*
* For table scans, rather than using the relation targetlist (which is
@@ -298,6 +315,23 @@ create_scan_plan(PlannerInfo *root, Path *best_path)
scan_clauses);
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ plan = (Plan *) create_remotequery_plan(root,
+ best_path,
+ tlist,
+ scan_clauses);
+
+ /*
+ * Insert a materialization plan above this temporarily
+ * until we better handle multiple steps using the same connection.
+ */
+ matplan = (Plan *) make_material(plan);
+ copy_plan_costsize(matplan, plan);
+ matplan->total_cost += cpu_tuple_cost * matplan->plan_rows;
+ plan = matplan;
+ break;
+#endif
default:
elog(ERROR, "unrecognized node type: %d",
(int) best_path->pathtype);
@@ -420,6 +454,9 @@ disuse_physical_tlist(Plan *plan, Path *path)
case T_ValuesScan:
case T_CteScan:
case T_WorkTableScan:
+#ifdef PGXC
+ case T_RemoteQuery:
+#endif
plan->targetlist = build_relation_tlist(path->parent);
break;
default:
@@ -1544,6 +1581,46 @@ create_worktablescan_plan(PlannerInfo *root, Path *best_path,
return scan_plan;
}
+#ifdef PGXC
+/*
+ * create_remotequery_plan
+ * Returns a remotequery plan for the base relation scanned by 'best_path'
+ * with restriction clauses 'scan_clauses' and targetlist 'tlist'.
+ */
+static RemoteQuery *
+create_remotequery_plan(PlannerInfo *root, Path *best_path,
+ List *tlist, List *scan_clauses)
+{
+ RemoteQuery *scan_plan;
+ Index scan_relid = best_path->parent->relid;
+ RangeTblEntry *rte;
+
+
+ Assert(scan_relid > 0);
+ rte = planner_rt_fetch(scan_relid, root);
+ Assert(best_path->parent->rtekind == RTE_RELATION);
+ Assert(rte->rtekind == RTE_RELATION);
+
+ /* Sort clauses into best execution order */
+ scan_clauses = order_qual_clauses(root, scan_clauses);
+
+ /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */
+ scan_clauses = extract_actual_clauses(scan_clauses, false);
+
+ scan_plan = make_remotequery(tlist,
+ rte,
+ scan_clauses,
+ scan_relid);
+
+ copy_path_costsize(&scan_plan->scan.plan, best_path);
+
+ /* PGXCTODO - get better estimates */
+ scan_plan->scan.plan.plan_rows = 1000;
+
+ return scan_plan;
+}
+#endif
+
/*****************************************************************************
*
@@ -2615,6 +2692,28 @@ make_worktablescan(List *qptlist,
return node;
}
+#ifdef PGXC
+static RemoteQuery *
+make_remotequery(List *qptlist,
+ RangeTblEntry *rte,
+ List *qpqual,
+ Index scanrelid)
+{
+ RemoteQuery *node = makeNode(RemoteQuery);
+ Plan *plan = &node->scan.plan;
+
+ /* cost should be inserted by caller */
+ plan->targetlist = qptlist;
+ plan->qual = qpqual;
+ plan->lefttree = NULL;
+ plan->righttree = NULL;
+ node->scan.scanrelid = scanrelid;
+ node->read_only = true;
+
+ return node;
+}
+#endif
+
Append *
make_append(List *appendplans, bool isTarget, List *tlist)
{
diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c
index d839c28..cab7fb4 100644
--- a/src/backend/optimizer/plan/setrefs.c
+++ b/src/backend/optimizer/plan/setrefs.c
@@ -22,6 +22,9 @@
#include "optimizer/clauses.h"
#include "optimizer/planmain.h"
#include "optimizer/tlist.h"
+#ifdef PGXC
+#include "pgxc/planner.h"
+#endif
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
@@ -373,6 +376,19 @@ set_plan_refs(PlannerGlobal *glob, Plan *plan, int rtoffset)
fix_scan_list(glob, splan->scan.plan.qual, rtoffset);
}
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ {
+ RemoteQuery *splan = (RemoteQuery *) plan;
+
+ splan->scan.scanrelid += rtoffset;
+ splan->scan.plan.targetlist =
+ fix_scan_list(glob, splan->scan.plan.targetlist, rtoffset);
+ splan->scan.plan.qual =
+ fix_scan_list(glob, splan->scan.plan.qual, rtoffset);
+ }
+ break;
+#endif
case T_NestLoop:
case T_MergeJoin:
case T_HashJoin:
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index bde351d..e5c6dac 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -1963,6 +1963,12 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params)
bms_add_member(context.paramids,
((WorkTableScan *) plan)->wtParam);
break;
+#ifdef PGXC
+ case T_RemoteQuery:
+ //PGXCTODO
+ context.paramids = bms_add_members(context.paramids, valid_params);
+ break;
+#endif
case T_Append:
{
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index b7c3d3c..cbf7618 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1310,6 +1310,28 @@ create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel)
return pathnode;
}
+#ifdef PGXC
+/*
+ * create_remotequery_path
+ * Creates a path corresponding to a scan of a remote query,
+ * returning the pathnode.
+ */
+Path *
+create_remotequery_path(PlannerInfo *root, RelOptInfo *rel)
+{
+ Path *pathnode = makeNode(Path);
+
+ pathnode->pathtype = T_RemoteQuery;
+ pathnode->parent = rel;
+ pathnode->pathkeys = NIL; /* result is always unordered */
+
+ // PGXCTODO - set cost properly
+ cost_seqscan(pathnode, root, rel);
+
+ return pathnode;
+}
+#endif
+
/*
* create_nestloop_path
* Creates a pathnode corresponding to a nestloop join between two
diff --git a/src/backend/pgxc/plan/planner.c b/src/backend/pgxc/plan/planner.c
index 1dcfc29..c8911b7 100644
--- a/src/backend/pgxc/plan/planner.c
+++ b/src/backend/pgxc/plan/planner.c
@@ -25,6 +25,7 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "optimizer/clauses.h"
+#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
@@ -141,7 +142,7 @@ bool StrictSelectChecking = false;
static Exec_Nodes *get_plan_nodes(Query *query, bool isRead);
static bool get_plan_nodes_walker(Node *query_node, XCWalkerContext *context);
static bool examine_conditions_walker(Node *expr_node, XCWalkerContext *context);
-
+static int handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt);
/*
* True if both lists contain only one node and are the same
@@ -1528,16 +1529,6 @@ get_simple_aggregates(Query * query)
simple_agg_list = lappend(simple_agg_list, simple_agg);
}
- else
- {
- /*
- * PGXCTODO relax this limit after adding GROUP BY support
- * then support expressions of aggregates
- */
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Query is not yet supported"))));
- }
column_pos++;
}
}
@@ -1629,7 +1620,7 @@ reconstruct_step_query(List *rtable, bool has_order_by, List *extra_sort,
{
List *context;
bool useprefix;
- List *sub_tlist = step->plan.targetlist;
+ List *sub_tlist = step->scan.plan.targetlist;
ListCell *l;
StringInfo buf = makeStringInfo();
char *sql;
@@ -1737,7 +1728,7 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
{
List *sortcls = query->sortClause;
List *distinctcls = query->distinctClause;
- List *sub_tlist = step->plan.targetlist;
+ List *sub_tlist = step->scan.plan.targetlist;
SimpleSort *sort;
SimpleDistinct *distinct;
ListCell *l;
@@ -1978,6 +1969,100 @@ make_simple_sort_from_sortclauses(Query *query, RemoteQuery *step)
}
/*
+ * Special case optimization.
+ * Handle LIMIT and OFFSET for single-step queries on multiple nodes.
+ *
+ * Return non-zero if we need to fall back to the standard plan.
+ */
+static int
+handle_limit_offset(RemoteQuery *query_step, Query *query, PlannedStmt *plan_stmt)
+{
+
+ /* check if no special handling needed */
+ if (query_step && query_step->exec_nodes &&
+ list_length(query_step->exec_nodes->nodelist) <= 1)
+ return 0;
+
+ /* if order by and limit are present, do not optimize yet */
+ if ((query->limitCount || query->limitOffset) && query->sortClause)
+ return 1;
+
+ /*
+ * Note that query_step->is_single_step is set to true, but
+ * it is ok even if we add limit here.
+ * If OFFSET is set, we strip the final offset value and add
+ * it to the LIMIT passed down. If there is an OFFSET and no
+ * LIMIT, we just strip off OFFSET.
+ */
+ if (query->limitOffset)
+ {
+ int64 newLimit = 0;
+ char *newpos;
+ char *pos;
+ char *limitpos;
+ char *newQuery;
+ char *newchar;
+ char *c;
+
+ pos = NULL;
+ newpos = NULL;
+
+ if (query->limitCount)
+ {
+ for (pos = query_step->sql_statement, newpos = pos; newpos != NULL; )
+ {
+ pos = newpos;
+ newpos = strcasestr(pos+1, "LIMIT");
+ }
+ limitpos = pos;
+
+ if (IsA(query->limitCount, Const))
+ newLimit = DatumGetInt64(((Const *) query->limitCount)->constvalue);
+ else
+ return 1;
+ }
+
+ for (pos = query_step->sql_statement, newpos = pos; newpos != NULL; )
+ {
+ pos = newpos;
+ newpos = strcasestr(pos+1, "OFFSET");
+ }
+
+ if (limitpos && limitpos < pos)
+ pos = limitpos;
+
+ if (IsA(query->limitOffset, Const))
+ newLimit += DatumGetInt64(((Const *) query->limitOffset)->constvalue);
+ else
+ return 1;
+
+ if (!pos || pos == query_step->sql_statement)
+ elog(ERROR, "Could not handle LIMIT/OFFSET");
+
+ newQuery = (char *) palloc(strlen(query_step->sql_statement)+1);
+ newchar = newQuery;
+
+ /* copy up until position where we found clause */
+ for (c = &query_step->sql_statement[0]; c != pos && *c != '\0'; *newchar++ = *c++);
+
+ if (query->limitCount)
+ sprintf(newchar, "LIMIT %I64d", newLimit);
+ else
+ *newchar = '\0';
+
+ pfree(query_step->sql_statement);
+ query_step->sql_statement = newQuery;
+ }
+
+ /* Now add a limit execution node at the top of the plan */
+ plan_stmt->planTree = (Plan *) make_limit(plan_stmt->planTree,
+ query->limitOffset, query->limitCount, 0, 0);
+
+ return 0;
+}
+
+
+/*
* Build up a QueryPlan to execute on.
*
* For the prototype, there will only be one step,
@@ -1997,6 +2082,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
Plan *standardPlan = result->planTree;
RemoteQuery *query_step = makeNode(RemoteQuery);
+ query_step->is_single_step = false;
query_step->sql_statement = pstrdup(query->sql_statement);
query_step->exec_nodes = NULL;
query_step->combine_type = COMBINE_TYPE_NONE;
@@ -2020,21 +2106,6 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
ereport(ERROR,
(errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
(errmsg("INTO clause not yet supported"))));
-
- if (query->setOperations)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("UNION, INTERSECT and EXCEPT are not yet supported"))));
-
- if (query->hasRecursive)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("WITH RECURSIVE not yet supported"))));
-
- if (query->hasWindowFuncs)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Window functions not yet supported"))));
/* fallthru */
case T_InsertStmt:
case T_UpdateStmt:
@@ -2043,14 +2114,32 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
if (query_step->exec_nodes == NULL)
{
+ /* Do not yet allow multi-node correlated UPDATE or DELETE */
+ if ((query->nodeTag == T_UpdateStmt || query->nodeTag == T_DeleteStmt))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
+ (errmsg("Complex and correlated UPDATE and DELETE not yet supported"))));
+ }
+
/*
- * Processing guery against catalog tables, restore
- * standard plan
+ * Processing guery against catalog tables, or multi-step command.
+ * Restore standard plan
*/
result->planTree = standardPlan;
return result;
}
+ /* Do not yet allow multi-node correlated UPDATE or DELETE */
+ if ((query->nodeTag == T_UpdateStmt || query->nodeTag == T_DeleteStmt)
+ && !query_step->exec_nodes
+ && list_length(query->rtable) > 1)
+ {
+ result->planTree = standardPlan;
+ return result;
+ }
+
+ query_step->is_single_step = true;
/*
* PGXCTODO
* When Postgres runs insert into t (a) values (1); against table
@@ -2064,7 +2153,7 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
* then call standard planner and take targetList from the plan
* generated by Postgres.
*/
- query_step->plan.targetlist = standardPlan->targetlist;
+ query_step->scan.plan.targetlist = standardPlan->targetlist;
if (query_step->exec_nodes)
query_step->combine_type = get_plan_combine_type(
@@ -2075,39 +2164,36 @@ pgxc_planner(Query *query, int cursorOptions, ParamListInfo boundParams)
query_step->simple_aggregates = get_simple_aggregates(query);
/*
- * Add sortring to the step
+ * Add sorting to the step
*/
if (list_length(query_step->exec_nodes->nodelist) > 1 &&
(query->sortClause || query->distinctClause))
make_simple_sort_from_sortclauses(query, query_step);
- /*
- * PG-XC cannot yet support some variations of SQL statements.
- * We perform some checks to at least catch common cases
- */
+ /* Handle LIMIT and OFFSET for single-step queries on multiple nodes*/
+ if (handle_limit_offset(query_step, query, result))
+ {
+ /* complicated expressions, just fallback to standard plan */
+ result->planTree = standardPlan;
+ return result;
+ }
+ /*
+ * Use standard plan if we have more than one data node with either
+ * group by, hasWindowFuncs, or hasRecursive
+ */
/*
- * Check if we have multiple nodes and an unsupported clause. This
- * is temporary until we expand supported SQL
+ * PGXCTODO - this could be improved to check if the first
+ * group by expression is the partitioning column, in which
+ * case it is ok to treat as a single step.
*/
- if (query->nodeTag == T_SelectStmt)
+ if (query->nodeTag == T_SelectStmt
+ && query_step->exec_nodes
+ && list_length(query_step->exec_nodes->nodelist) > 1
+ && (query->groupClause || query->hasWindowFuncs || query->hasRecursive))
{
- if (StrictStatementChecking && query_step->exec_nodes
- && list_length(query_step->exec_nodes->nodelist) > 1)
- {
- /*
- * PGXCTODO - this could be improved to check if the first
- * group by expression is the partitioning column
- */
- if (query->groupClause)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Multi-node GROUP BY not yet supported"))));
- if (query->limitCount && StrictSelectChecking)
- ereport(ERROR,
- (errcode(ERRCODE_STATEMENT_TOO_COMPLEX),
- (errmsg("Multi-node LIMIT not yet supported"))));
- }
+ result->planTree = standardPlan;
+ return result;
}
break;
default:
diff --git a/src/backend/pgxc/pool/Makefile b/src/backend/pgxc/pool/Makefile
index e875303..c7e950a 100644
--- a/src/backend/pgxc/pool/Makefile
+++ b/src/backend/pgxc/pool/Makefile
@@ -14,6 +14,6 @@ subdir = src/backend/pgxc/pool
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o
+OBJS = datanode.o execRemote.o poolmgr.o poolcomm.o postgresql_fdw.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/pgxc/pool/execRemote.c b/src/backend/pgxc/pool/execRemote.c
index 0f16c51..43569e0 100644
--- a/src/backend/pgxc/pool/execRemote.c
+++ b/src/backend/pgxc/pool/execRemote.c
@@ -30,6 +30,8 @@
#include "utils/tuplesort.h"
#include "utils/snapmgr.h"
+extern char *deparseSql(RemoteQueryState *scanstate);
+
/*
* Buffer size does not affect performance significantly, just do not allow
* connection buffer grows infinitely
@@ -1461,8 +1463,8 @@ DataNodeCopyBegin(const char *query, List *nodelist, Snapshot snapshot, bool is_
{
if (need_tran)
DataNodeCopyFinish(connections, 0, COMBINE_TYPE_NONE);
- else
- if (!PersistentConnections) release_handles();
+ else if (!PersistentConnections)
+ release_handles();
}
pfree(connections);
@@ -1812,21 +1814,44 @@ ExecCountSlotsRemoteQuery(RemoteQuery *node)
RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
- RemoteQueryState *remotestate;
+ RemoteQueryState *remotestate;
+ Relation currentRelation;
+
remotestate = CreateResponseCombiner(0, node->combine_type);
remotestate->ss.ps.plan = (Plan *) node;
remotestate->ss.ps.state = estate;
remotestate->simple_aggregates = node->simple_aggregates;
+ remotestate->ss.ps.qual = (List *)
+ ExecInitExpr((Expr *) node->scan.plan.qual,
+ (PlanState *) remotestate);
+
ExecInitResultTupleSlot(estate, &remotestate->ss.ps);
- if (node->plan.targetlist)
+ if (node->scan.plan.targetlist)
{
- TupleDesc typeInfo = ExecCleanTypeFromTL(node->plan.targetlist, false);
+ TupleDesc typeInfo = ExecCleanTypeFromTL(node->scan.plan.targetlist, false);
ExecSetSlotDescriptor(remotestate->ss.ps.ps_ResultTupleSlot, typeInfo);
}
ExecInitScanTupleSlot(estate, &remotestate->ss);
+
+ /*
+ * Initialize scan relation. get the relation object id from the
+ * relid'th entry in the range table, open that relation and acquire
+ * appropriate lock on it.
+ * This is needed for deparseSQL
+ * We should remove these lines once we plan and deparse earlier.
+ */
+ if (!node->is_single_step)
+ {
+ currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid);
+ remotestate->ss.ss_currentRelation = currentRelation;
+ ExecAssignScanType(&remotestate->ss, RelationGetDescr(currentRelation));
+ }
+
+ remotestate->ss.ps.ps_TupFromTlist = false;
+
/*
* Tuple description for the scan slot will be set on runtime from
* a RowDescription message
@@ -1991,7 +2016,6 @@ TupleTableSlot *
ExecRemoteQuery(RemoteQueryState *node)
{
RemoteQuery *step = (RemoteQuery *) node->ss.ps.plan;
- EState *estate = node->ss.ps.state;
TupleTableSlot *resultslot = node->ss.ps.ps_ResultTupleSlot;
TupleTableSlot *scanslot = node->ss.ss_ScanTupleSlot;
bool have_tuple = false;
@@ -2092,6 +2116,11 @@ ExecRemoteQuery(RemoteQueryState *node)
data_node_begin(new_count, new_connections, gxid);
}
+ /* Get the SQL string */
+ /* only do if not single step */
+ if (!step->is_single_step)
+ step->sql_statement = deparseSql(node);
+
/* See if we have a primary nodes, execute on it first before the others */
if (primaryconnection)
{
@@ -2427,12 +2456,35 @@ ExecEndRemoteQuery(RemoteQueryState *node)
if (outerPlanState(node))
ExecEndNode(outerPlanState(node));
+ if (node->ss.ss_currentRelation)
+ ExecCloseScanRelation(node->ss.ss_currentRelation);
+
if (node->tmp_ctx)
MemoryContextDelete(node->tmp_ctx);
CloseCombiner(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecRemoteQueryReScan
+ *
+ * Rescans the relation.
+ * ----------------------------------------------------------------
+ */
+void
+ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt)
+{
+ /* At the moment we materialize results for multi-step queries,
+ * so no need to support rescan.
+ // PGXCTODO - rerun Init?
+ //node->routine->ReOpen(node);
+
+ //ExecScanReScan((ScanState *) node);
+ */
+}
+
+
/*
* Execute utility statement on multiple data nodes
* It does approximately the same as
diff --git a/src/backend/pgxc/pool/postgresql_fdw.c b/src/backend/pgxc/pool/postgresql_fdw.c
new file mode 100644
index 0000000..9e418be
--- /dev/null
+++ b/src/backend/pgxc/pool/postgresql_fdw.c
@@ -0,0 +1,335 @@
+/*-------------------------------------------------------------------------
+ *
+ * postgresql_fdw.c
+ * foreign-data wrapper for PostgreSQL
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "catalog/pg_operator.h"
+#include "catalog/pg_proc.h"
+#include "funcapi.h"
+//#include "libpq-fe.h"
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/clauses.h"
+#include "parser/scansup.h"
+#include "pgxc/execRemote.h"
+#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+
+//#include "dblink.h"
+
+#define DEBUG_FDW
+
+/*
+ * WHERE caluse optimization level
+ */
+#define EVAL_QUAL_LOCAL 0 /* evaluate none in foreign, all in local */
+#define EVAL_QUAL_BOTH 1 /* evaluate some in foreign, all in local */
+#define EVAL_QUAL_FOREIGN 2 /* evaluate some in foreign, rest in local */
+
+#define OPTIMIZE_WHERE_CLAUSE EVAL_QUAL_FOREIGN
+
+
+
+/* deparse SQL from the request */
+static bool is_immutable_func(Oid funcid);
+static bool is_foreign_qual(ExprState *state);
+static bool foreign_qual_walker(Node *node, void *context);
+char *deparseSql(RemoteQueryState *scanstate);
+
+
+/*
+ * Check whether the function is IMMUTABLE.
+ */
+static bool
+is_immutable_func(Oid funcid)
+{
+ HeapTuple tp;
+ bool isnull;
+ Datum datum;
+
+ tp = SearchSysCache(PROCOID, ObjectIdGetDatum(funcid), 0, 0, 0);
+ if (!HeapTupleIsValid(tp))
+ elog(ERROR, "cache lookup failed for function %u", funcid);
+
+#ifdef DEBUG_FDW
+ /* print function name and its immutability */
+ {
+ char *proname;
+ datum = SysCacheGetAttr(PROCOID, tp, Anum_pg_proc_proname, &isnull);
+ proname = pstrdup(DatumGetName(datum)->data);
+ elog(DEBUG1, "func %s(%u) is%s immutable", proname, funcid,
+ (DatumGetChar(datum) == PROVOLATILE_IMMUTABLE) ? "" : " not");
+ pfree(proname);
+ }
+#endif
+
+ datum = SysCacheGetAttr(PROCOID, tp, Anum_pg_proc_provolatile, &isnull);
+ ReleaseSysCache(tp);
+
+ return (DatumGetChar(datum) == PROVOLATILE_IMMUTABLE);
+}
+
+/*
+ * Check whether the ExprState node should be evaluated in foreign server.
+ *
+ * An expression which consists of expressions below will be evaluated in
+ * the foreign server.
+ * - constant value
+ * - variable (foreign table column)
+ * - external parameter (parameter of prepared statement)
+ * - array
+ * - bool expression (AND/OR/NOT)
+ * - NULL test (IS [NOT] NULL)
+ * - operator
+ * - IMMUTABLE only
+ * - It is required that the meaning of the operator be the same as the
+ * local server in the foreign server.
+ * - function
+ * - IMMUTABLE only
+ * - It is required that the meaning of the operator be the same as the
+ * local server in the foreign server.
+ * - scalar array operator (ANY/ALL)
+ */
+static bool
+is_foreign_qual(ExprState *state)
+{
+ return !foreign_qual_walker((Node *) state->expr, NULL);
+}
+
+/*
+ * return true if node cannot be evaluatated in foreign server.
+ */
+static bool
+foreign_qual_walker(Node *node, void *context)
+{
+ if (node == NULL)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_Param:
+ /* TODO: pass internal parameters to the foreign server */
+ if (((Param *) node)->paramkind != PARAM_EXTERN)
+ return true;
+ break;
+ case T_DistinctExpr:
+ case T_OpExpr:
+ /*
+ * An operator which uses IMMUTABLE function can be evaluated in
+ * foreign server . It is not necessary to worry about oprrest
+ * and oprjoin here because they are invoked by planner but not
+ * executor. DistinctExpr is a typedef of OpExpr.
+ */
+ if (!is_immutable_func(((OpExpr*) node)->opfuncid))
+ return true;
+ break;
+ case T_ScalarArrayOpExpr:
+ if (!is_immutable_func(((ScalarArrayOpExpr*) node)->opfuncid))
+ return true;
+ break;
+ case T_FuncExpr:
+ /* IMMUTABLE function can be evaluated in foreign server */
+ if (!is_immutable_func(((FuncExpr*) node)->funcid))
+ return true;
+ break;
+ case T_TargetEntry:
+ case T_PlaceHolderVar:
+ case T_AppendRelInfo:
+ case T_PlaceHolderInfo:
+ /* TODO: research whether those complex nodes are evaluatable. */
+ return true;
+ default:
+ break;
+ }
+
+ return expression_tree_walker(node, foreign_qual_walker, context);
+}
+
+/*
+ * Deparse SQL string from query request.
+ *
+ * The expressions in Plan.qual are deparsed when it satisfies is_foreign_qual()
+ * and removed.
+ */
+char *
+deparseSql(RemoteQueryState *scanstate)
+{
+ EState *estate = scanstate->ss.ps.state;
+ bool prefix;
+ List *context;
+ StringInfoData sql;
+ RemoteQuery *scan;
+ RangeTblEntry *rte;
+ Oid nspid;
+ char *nspname;
+ char *relname;
+ const char *nspname_q;
+ const char *relname_q;
+ const char *aliasname_q;
+ int i;
+ TupleDesc tupdesc;
+ bool first;
+
+elog(DEBUG2, "%s(%u) called", __FUNCTION__, __LINE__);
+
+ /* extract RemoteQuery and RangeTblEntry */
+ scan = (RemoteQuery *)scanstate->ss.ps.plan;
+ rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
+
+ /* prepare to deparse plan */
+ initStringInfo(&sql);
+ context = deparse_context_for_plan((Node *)scan, NULL,
+ estate->es_range_table, NULL);
+
+ /*
+ * Scanning multiple relations in a RemoteQuery node is not supported.
+ */
+ prefix = false;
+#if 0
+ prefix = list_length(estate->es_range_table) > 1;
+#endif
+
+ /* Get quoted names of schema, table and alias */
+ nspid = get_rel_namespace(rte->relid);
+ nspname = get_namespace_name(nspid);
+ relname = get_rel_name(rte->relid);
+ nspname_q = quote_identifier(nspname);
+ relname_q = quote_identifier(relname);
+ aliasname_q = quote_identifier(rte->eref->aliasname);
+
+ /* deparse SELECT clause */
+ appendStringInfo(&sql, "SELECT ");
+
+ /*
+ * TODO: omit (deparse to "NULL") columns which are not used in the
+ * original SQL.
+ *
+ * We must parse nodes parents of this RemoteQuery node to determine unused
+ * columns because some columns may be used only in parent Sort/Agg/Limit
+ * nodes.
+ */
+ tupdesc = scanstate->ss.ss_currentRelation->rd_att;
+ first = true;
+ for (i = 0; i < tupdesc->natts; i++)
+ {
+ /* skip dropped attributes */
+ if (tupdesc->attrs[i]->attisdropped)
+ continue;
+
+ if (!first)
+ appendStringInfoString(&sql, ", ");
+
+ if (prefix)
+ appendStringInfo(&sql, "%s.%s",
+ aliasname_q, tupdesc->attrs[i]->attname.data);
+ else
+ appendStringInfo(&sql, "%s", tupdesc->attrs[i]->attname.data);
+ first = false;
+ }
+
+ /* if target list is composed only of system attributes, add dummy column */
+ if (first)
+ appendStringInfo(&sql, "NULL");
+
+ /* deparse FROM clause */
+ appendStringInfo(&sql, " FROM ");
+ /*
+ * XXX: should use GENERIC OPTIONS like 'foreign_relname' or something for
+ * the foreign table name instead of the local name ?
+ */
+ appendStringInfo(&sql, "%s.%s %s", nspname_q, relname_q, aliasname_q);
+ pfree(nspname);
+ pfree(relname);
+ if (nspname_q != nspname_q)
+ pfree((char *) nspname_q);
+ if (relname_q != relname_q)
+ pfree((char *) relname_q);
+ if (aliasname_q != rte->eref->aliasname)
+ pfree((char *) aliasname_q);
+
+ /*
+ * deparse WHERE cluase
+ *
+ * The expressions which satisfy is_foreign_qual() are deparsed into WHERE
+ * clause of result SQL string, and they could be removed from qual of
+ * PlanState to avoid duplicate evaluation at ExecScan().
+ *
+ * The Plan.qual is never changed, so multiple use of the Plan with
+ * PREPARE/EXECUTE work properly.
+ */
+#if OPTIMIZE_WHERE_CLAUSE > EVAL_QUAL_LOCAL
+ if (scanstate->ss.ps.plan->qual)
+ {
+ List *local_qual = NIL;
+ List *foreign_qual = NIL;
+ List *foreign_expr = NIL;
+ ListCell *lc;
+
+ /*
+ * Divide qual of PlanState into two lists, one for local evaluation
+ * and one for foreign evaluation.
+ */
+ foreach (lc, scanstate->ss.ps.qual)
+ {
+ ExprState *state = lfirst(lc);
+
+ if (is_foreign_qual(state))
+ {
+ elog(DEBUG1, "foreign qual: %s", nodeToString(state->expr));
+ foreign_qual = lappend(foreign_qual, state);
+ foreign_expr = lappend(foreign_expr, state->expr);
+ }
+ else
+ {
+ elog(DEBUG1, "local qual: %s", nodeToString(state->expr));
+ local_qual = lappend(local_qual, state);
+ }
+ }
+#if OPTIMIZE_WHERE_CLAUSE == EVAL_QUAL_FOREIGN
+ /*
+ * If the optimization level is EVAL_QUAL_FOREIGN, replace the original
+ * qual with the list of ExprStates which should be evaluated in the
+ * local server.
+ */
+ scanstate->ss.ps.qual = local_qual;
+#endif
+
+ /*
+ * Deparse quals to be evaluated in the foreign server if any.
+ * TODO: modify deparse_expression() to deparse conditions which use
+ * internal parameters.
+ */
+ if (foreign_expr != NIL)
+ {
+ Node *node;
+ node = (Node *) make_ands_explicit(foreign_expr);
+ appendStringInfo(&sql, " WHERE ");
+ appendStringInfo(&sql,
+ deparse_expression(node, context, prefix, false));
+ /*
+ * The contents of the list MUST NOT be free-ed because they are
+ * referenced from Plan.qual list.
+ */
+ list_free(foreign_expr);
+ }
+ }
+#endif
+
+ elog(DEBUG1, "deparsed SQL is \"%s\"", sql.data);
+
+ return sql.data;
+}
+
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 608755f..a6f4767 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -198,8 +198,6 @@ static void log_disconnections(int code, Datum arg);
#ifdef PGXC /* PGXC_DATANODE */
-static void pgxc_transaction_stmt (Node *parsetree);
-
/* ----------------------------------------------------------------
* PG-XC routines
* ----------------------------------------------------------------
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index d1c01da..0c6208c 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -306,7 +306,6 @@ ProcessUtility(Node *parsetree,
case TRANS_STMT_START:
{
ListCell *lc;
-
#ifdef PGXC
if (IS_PGXC_COORDINATOR)
DataNodeBegin();
@@ -329,10 +328,6 @@ ProcessUtility(Node *parsetree,
break;
case TRANS_STMT_COMMIT:
-#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
- DataNodeCommit();
-#endif
if (!EndTransactionBlock())
{
/* report unsuccessful commit in completionTag */
@@ -361,10 +356,6 @@ ProcessUtility(Node *parsetree,
break;
case TRANS_STMT_ROLLBACK:
-#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
- DataNodeBegin();
-#endif
UserAbortTransactionBlock();
break;
@@ -1055,21 +1046,16 @@ ProcessUtility(Node *parsetree,
case T_ExplainStmt:
ExplainQuery((ExplainStmt *) parsetree, queryString, params, dest);
-#ifdef PGXC
- if (IS_PGXC_COORDINATOR)
- {
- Exec_Nodes *nodes = (Exec_Nodes *) palloc0(sizeof(Exec_Nodes));
- nodes->nodelist = GetAnyDataNode();
- ExecUtilityStmtOnNodes(queryString, nodes, false);
- }
-#endif
break;
case T_VariableSetStmt:
ExecSetVariableStmt((VariableSetStmt *) parsetree);
#ifdef PGXC
+/* PGXCTODO - this currently causes an assertion failure.
+ We should change when we add SET handling properly
if (IS_PGXC_COORDINATOR)
ExecUtilityStmtOnNodes(queryString, NULL, false);
+*/
#endif
break;
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 11be226..29aed38 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -79,6 +79,9 @@ extern void cost_functionscan(Path *path, PlannerInfo *root,
RelOptInfo *baserel);
extern void cost_valuesscan(Path *path, PlannerInfo *root,
RelOptInfo *baserel);
+#ifdef PGXC
+extern void cost_remotequery(Path *path, PlannerInfo *root, RelOptInfo *baserel);
+#endif
extern void cost_ctescan(Path *path, PlannerInfo *root, RelOptInfo *baserel);
extern void cost_recursive_union(Plan *runion, Plan *nrterm, Plan *rterm);
extern void cost_sort(Path *path, PlannerInfo *root,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 0f4c52e..05efcaf 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -56,6 +56,9 @@ extern Path *create_functionscan_path(PlannerInfo *root, RelOptInfo *rel);
extern Path *create_valuesscan_path(PlannerInfo *root, RelOptInfo *rel);
extern Path *create_ctescan_path(PlannerInfo *root, RelOptInfo *rel);
extern Path *create_worktablescan_path(PlannerInfo *root, RelOptInfo *rel);
+#ifdef PGXC
+extern Path *create_remotequery_path(PlannerInfo *root, RelOptInfo *rel);
+#endif
extern NestPath *create_nestloop_path(PlannerInfo *root,
RelOptInfo *joinrel,
diff --git a/src/include/pgxc/execRemote.h b/src/include/pgxc/execRemote.h
index b7faa7d..143c8fa 100644
--- a/src/include/pgxc/execRemote.h
+++ b/src/include/pgxc/execRemote.h
@@ -95,6 +95,7 @@ extern void ExecRemoteUtility(RemoteQuery *node);
extern int handle_response(DataNodeHandle * conn, RemoteQueryState *combiner);
extern bool FetchTuple(RemoteQueryState *combiner, TupleTableSlot *slot);
-extern int primary_data_node;
+extern void ExecRemoteQueryReScan(RemoteQueryState *node, ExprContext *exprCtxt);
-#endif
\ No newline at end of file
+extern int primary_data_node;
+#endif
diff --git a/src/include/pgxc/planner.h b/src/include/pgxc/planner.h
index bf8f224..346dd65 100644
--- a/src/include/pgxc/planner.h
+++ b/src/include/pgxc/planner.h
@@ -58,7 +58,8 @@ typedef struct
*/
typedef struct
{
- Plan plan;
+ Scan scan;
+ bool is_single_step; /* special case, skip extra work */
char *sql_statement;
Exec_Nodes *exec_nodes;
CombineType combine_type;
-----------------------------------------------------------------------
Summary of changes:
src/backend/commands/explain.c | 31 +++
src/backend/commands/tablecmds.c | 14 ++
src/backend/executor/execAmi.c | 8 +
src/backend/executor/nodeMaterial.c | 38 ++++
src/backend/optimizer/path/allpaths.c | 20 ++
src/backend/optimizer/plan/createplan.c | 99 +++++++++
src/backend/optimizer/plan/setrefs.c | 16 ++
src/backend/optimizer/plan/subselect.c | 6 +
src/backend/optimizer/util/pathnode.c | 22 ++
src/backend/pgxc/plan/planner.c | 196 +++++++++++++-----
src/backend/pgxc/pool/Makefile | 2 +-
src/backend/pgxc/pool/execRemote.c | 64 ++++++-
src/backend/pgxc/pool/postgresql_fdw.c | 335 +++++++++++++++++++++++++++++++
src/backend/tcop/postgres.c | 2 -
src/backend/tcop/utility.c | 20 +--
src/include/optimizer/cost.h | 3 +
src/include/optimizer/pathnode.h | 3 +
src/include/pgxc/execRemote.h | 5 +-
src/include/pgxc/planner.h | 3 +-
19 files changed, 803 insertions(+), 84 deletions(-)
create mode 100644 src/backend/pgxc/pool/postgresql_fdw.c
hooks/post-receive
--
Postgres-XC
|