|
From: <tho...@us...> - 2014-04-21 19:15:20
|
Revision: 8132
http://sourceforge.net/p/bigdata/code/8132
Author: thompsonbry
Date: 2014-04-21 19:15:16 +0000 (Mon, 21 Apr 2014)
Log Message:
-----------
Updated version of the HALoadBalancer.
See #624 (HA Load Balancer).
Modified Paths:
--------------
branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java
branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/IHALoadBalancerPolicy.java
Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java
===================================================================
--- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java 2014-04-21 16:35:35 UTC (rev 8131)
+++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java 2014-04-21 19:15:16 UTC (rev 8132)
@@ -23,10 +23,17 @@
package com.bigdata.rdf.sail.webapp;
import java.io.IOException;
+import java.lang.ref.WeakReference;
import java.net.URI;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletConfig;
@@ -38,8 +45,10 @@
import org.apache.log4j.Logger;
import org.eclipse.jetty.proxy.ProxyServlet;
+import com.bigdata.counters.AbstractStatisticsCollector;
import com.bigdata.ganglia.GangliaService;
import com.bigdata.ganglia.HostReportComparator;
+import com.bigdata.ganglia.IGangliaMetricMessage;
import com.bigdata.ganglia.IHostReport;
import com.bigdata.ha.HAGlue;
import com.bigdata.ha.QuorumService;
@@ -47,46 +56,61 @@
import com.bigdata.journal.IIndexManager;
import com.bigdata.journal.PlatformStatsPlugIn;
import com.bigdata.journal.jini.ha.HAJournal;
+import com.bigdata.journal.jini.ha.HAJournalServer;
+import com.bigdata.quorum.AbstractQuorum;
import com.bigdata.quorum.Quorum;
+import com.bigdata.quorum.QuorumEvent;
+import com.bigdata.quorum.QuorumListener;
+import com.bigdata.util.InnerCause;
+import com.sun.corba.se.impl.orbutil.closure.Future;
/**
- *
- The HA Load Balancer servlet provides a transparent proxy for requests
+ * The HA Load Balancer servlet provides a transparent proxy for requests
* arriving its configured URL pattern (the "external" interface for the load
* balancer) to the root of the web application.
- * <P>
- * The use of the load balancer is entirely optional. If the security rules
- * permit, then clients MAY make requests directly against a specific service.
- * Thus, no specific provision exists to disable the load balancer servlet, but
- * you may choose not to deploy it.
* <p>
* When successfully deployed, requests having prefix corresponding to the URL
* pattern for the load balancer (typically, "/bigdata/LBS/*") are automatically
* redirected to a joined service in the met quorum based on the configured load
* balancer policy.
* <p>
+ * The use of the load balancer is entirely optional. If the load balancer is
+ * not properly configured, then it will simply rewrite itself out of any
+ * request and the request will be handled by the host to which it was directed
+ * (no proxying).
+ * <p>
+ * Note: If the security rules permit, then clients MAY make requests directly
+ * against a specific service.
+ * <p>
* The load balancer policies are "HA aware." They will always redirect update
- * requests to the quorum leader. The default polices will load balance read
- * requests over the leader and followers in a manner that reflects the CPU, IO
- * Wait, and GC Time associated with each service. The PlatformStatsPlugIn and
- * GangliaPlugIn MUST be enabled for the default load balancer policy to
- * operate. It depends on those plugins to maintain a model of the load on the
- * HA replication cluster. The GangliaPlugIn should be run only as a listener if
- * you are are running the real gmond process on the host. If you are not
- * running gmond, then the GangliaPlugIn should be configured as both a listener
- * and a sender.
+ * requests to the quorum leader. Read requests will be directed to one of the
+ * services that is joined with the met quorum.
+ * <p>
*
+ * <h3>Default Load Balancer Policy Configuration</h3>
+ * <p>
+ * The default policy will load balance read requests over the leader and
+ * followers in a manner that reflects the CPU, IO Wait, and GC Time associated
+ * with each service.
+ * <p>
+ * The {@link PlatformStatsPlugIn}\xCA and {@link GangliaPlugIn} MUST be enabled
+ * for the default load balancer policy to operate. It depends on those plugins
+ * to maintain a model of the load on the HA replication cluster. The
+ * GangliaPlugIn should be run only as a listener if you are are running the
+ * real gmond process on the host. If you are not running gmond, then the
+ * {@link GangliaPlugIn} should be configured as both a listener and a sender.
+ * <p>
+ * <ul>
+ * <li>The {@link PlatformStatsPlugIn} must be enabled.</li>.
+ * <li>The {@link GangliaPlugIn} must be enabled. The service does not need to
+ * be enabled for {@link GangliaPlugIn.Options#GANGLIA_REPORT}, but it must be
+ * enabled for {@link GangliaPlugIn.Options#GANGLIA_LISTEN}.
+ * </ul>
+ *
* @author <a href="mailto:tho...@us...">Bryan Thompson</a>
*
* @see <a href="http://trac.bigdata.com/ticket/624"> HA Load Balancer </a>
*
- * TODO Define some interesting load balancer policies. We can start with
- * HA aware round robin and an HA aware policy that is load balanced based
- * on the ganglia reported metrics model.
- *
- * All policies must be HA aware - we only want to send requests to
- * services that are joined with the met quorum.
- *
* TODO If the target service winds up not joined with the met quorum by
* the time we get there, what should it do? Report an error since we are
* already on its internal interface? Will this servlet see that error? If
@@ -127,96 +151,132 @@
*/
String POLICY = "policy";
- String DEFAULT_POLICY = DefaultLBSPolicy.class.getName();
-
/**
- * A {@link Comparator} that places {@link IHostReport}s into a total
- * ordering from the host with the least load to the host with the
- * greatest load (optional).
+ * FIXME The default must be something that we can override from the
+ * test suite in order to test these different policies. I've added some
+ * code to do this based on a System property, but the test suite does
+ * not allow us to set arbitrary system properties on the child
+ * processes so that code is not yet having the desired effect.
*/
- String COMPARATOR = "comparator";
-
- String DEFAULT_COMPARATOR = DefaultHostReportComparator.class.getName();
-
+// String DEFAULT_POLICY = RoundRobinPolicy.class.getName();
+ String DEFAULT_POLICY = NOPLBSPolicy.class.getName();
+// String DEFAULT_POLICY = GangliaLBSPolicy.class.getName();
+
}
public HALoadBalancerServlet() {
super();
}
+// /**
+// * This servlet request attribute is used to mark a request as either an
+// * update or a read-only operation.
+// */
+// protected static final String ATTR_LBS_UPDATE_REQUEST = "lbs-update-request";
+
+ /**
+ * If the LBS is not enabled, then it will strip its prefix from the URL
+ * requestURI and do a servlet forward to the resulting requestURI. This
+ * allows the webapp to start even if the LBS is not correctly configured.
+ */
private boolean enabled = false;
private String prefix = null;
private IHALoadBalancerPolicy policy;
- private Comparator<IHostReport> comparator;
- private GangliaService gangliaService;
- private String[] reportOn;
- @SuppressWarnings("unchecked")
@Override
public void init() throws ServletException {
super.init();
+ // Disabled by default.
+ enabled = false;
+
final ServletConfig servletConfig = getServletConfig();
final ServletContext servletContext = servletConfig.getServletContext();
- prefix = servletConfig.getInitParameter(InitParams.PREFIX);
-
- policy = newInstance(servletConfig, IHALoadBalancerPolicy.class,
- InitParams.POLICY, InitParams.DEFAULT_POLICY);
-
- comparator = newInstance(servletConfig, Comparator.class,
- InitParams.COMPARATOR, InitParams.DEFAULT_COMPARATOR);
-
final IIndexManager indexManager = BigdataServlet
.getIndexManager(servletContext);
if (!(indexManager instanceof HAJournal)) {
- throw new ServletException("Not HA");
+ // This is not an error, but the LBS is only for HA.
+ log.warn("Not HA");
+ return;
}
- final HAJournal journal = (HAJournal) indexManager;
+ prefix = servletConfig.getInitParameter(InitParams.PREFIX);
- if (journal.getPlatformStatisticsCollector() == null) {
- throw new ServletException("LBS requires "
- + PlatformStatsPlugIn.class.getName());
- }
+ policy = newInstance(servletConfig, IHALoadBalancerPolicy.class,
+ InitParams.POLICY, InitParams.DEFAULT_POLICY);
- gangliaService = (GangliaService) journal.getGangliaService();
+ try {
- if (gangliaService == null) {
- throw new ServletException("LBS requires "
- + GangliaPlugIn.class.getName());
+ // Attempt to provision the specified LBS policy.
+ policy.init(servletConfig, indexManager);
+
+ } catch (Throwable t) {
+
+ /*
+ * The specified LBS policy could not be provisioned.
+ */
+
+ if (InnerCause.isInnerCause(t, InterruptedException.class)) {
+ // Interrupted.
+ return;
+ }
+
+ log.error("Could not setup policy: " + policy, t);
+
+ try {
+ policy.destroy();
+ } catch (Throwable t2) {
+ if (InnerCause.isInnerCause(t, InterruptedException.class)) {
+ // Interrupted.
+ return;
+ }
+ log.warn("Problem destroying policy: " + policy, t2);
+ } finally {
+ policy = null;
+ }
+
+ /*
+ * Fall back onto a NOP policy. Each service will handle a
+ * read-request itself. Write requests are proxied to the quorum
+ * leader.
+ */
+
+ policy = new NOPLBSPolicy();
+
+ log.warn("Falling back: policy=" + policy);
+
+ // Initialize the fallback policy.
+ policy.init(servletConfig, indexManager);
+
}
- reportOn = gangliaService.getDefaultHostReportOn();
-
enabled = true;
servletContext.setAttribute(BigdataServlet.ATTRIBUTE_LBS_PREFIX,
prefix);
if (log.isInfoEnabled())
- log.info(servletConfig.getServletName() + " @ " + prefix);
+ log.info(servletConfig.getServletName() + " @ " + prefix
+ + " :: policy=" + policy);
}
@Override
public void destroy() {
-
+
enabled = false;
prefix = null;
- policy = null;
+ if (policy != null) {
+ policy.destroy();
+ policy = null;
+ }
- comparator = null;
-
- reportOn = null;
-
- gangliaService = null;
-
getServletContext().setAttribute(BigdataServlet.ATTRIBUTE_LBS_PREFIX,
null);
@@ -225,7 +285,55 @@
}
/**
+ * Return the configured value of the named parameter. This method checks
+ * the environment variables first for a fully qualified value for the
+ * parameter using <code>HALoadBalancerServer</code><i>name</i>. If no value
+ * is found for that variable, it checks the {@link ServletContext} for
+ * <i>name</i>. If no value is found again, it returns the default value
+ * specified by the caller. This makes it possible to configure the behavior
+ * of the {@link HALoadBalancerServlet} using environment variables.
+ *
+ * @param servletConfig
+ * The {@link ServletConfig}.
+ *
+ * @param iface
+ * The interface that the type must implement.
+ * @param name
+ * The name of the servlet init parameter.
+ * @param def
+ * The default value for the servlet init parameter.
+ * @return
+ */
+ private static String getConfigParam(final ServletConfig servletConfig,
+ final String name, final String def) {
+
+ // Look at environment variables for an override.
+ String s = System.getProperty(HALoadBalancerServlet.class.getName()
+ + "." + name);
+
+ if (s == null || s.trim().length() == 0) {
+
+ // Look at ServletConfig for the configured value.
+ s = servletConfig.getInitParameter(name);
+
+ }
+
+ if (s == null || s.trim().length() == 0) {
+
+ // Use the default value.
+ s = def;
+
+ }
+
+ return s;
+
+ }
+
+ /**
* Create an instance of some type based on the servlet init parameters.
+ * <p>
+ * Note: The configuration parameter MAY also be specified as <code>
+ * com.bigdata.rdf.sail.webapp.HALoadBalancerServlet.<i>name</i></code>.
*
* @param servletConfig
* The {@link ServletConfig}.
@@ -246,16 +354,9 @@
final Class<? extends T> iface, final String name, final String def)
throws ServletException {
+ final String s = getConfigParam(servletConfig, name, def);
+
final T t;
-
- String s = servletConfig.getInitParameter(name);
-
- if (s == null || s.trim().length() == 0) {
-
- s = def;
-
- }
-
final Class<? extends T> cls;
try {
cls = (Class<? extends T>) Class.forName(s);
@@ -285,230 +386,130 @@
IOException {
if (!enabled) {
- // The LBS is not available.
- response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ /*
+ * LBS is disabled. Strip LBS prefix from the requestURI and forward
+ * the request to servlet on this host (NOP LBS).
+ */
+ forwardToThisService(request, response);
+ return;
}
-
- final HostScore[] hosts = hostTable.get();
- if (hosts == null || hosts.length == 0) {
+ /*
+ * Decide whether this is a read-only request or an update request.
+ */
+ final boolean isUpdate = isUpdateRequest(request);
- // Ensure that the host table exists.
- updateHostsTable();
+// // Set the request attribute.
+// request.setAttribute(ATTR_LBS_UPDATE_REQUEST, isUpdate);
+
+ /*
+ * Delegate to policy. This provides a single point during which the
+ * policy can ensure that it is monitoring any necessary information and
+ * also provides an opportunity to override the behavior completely. For
+ * example, as an optimization, the policy can forward the request to a
+ * servlet in this servlet container rather than proxying it to either
+ * itself or another service.
+ */
+ if(policy.service(isUpdate, request, response)) {
- }
-
- final HAGlueScore[] services = serviceTable.get();
-
- if (services == null || services.length == 0) {
-
- /*
- * Ensure that the service table exists (more correctly, attempt to
- * populate it, but we can only do that if the HAQuorumService is
- * running.)
- */
-
- updateServicesTable();
+ // Return immediately if the response was committed.
+ return;
}
-
+
/*
* TODO if rewriteURL() returns null, then the base class (ProxyServlet)
* returns SC_FORBIDDEN. It should return something less ominous, like a
- * 404. With an explanation. Or a RETRY.
+ * 404. With an explanation. Or a RETRY. Or just forward to the local
+ * service and let it report an appropriate error message (e.g.,
+ * NotReady).
*/
super.service(request, response);
}
/**
- * Update the per-host scoring table.
+ * Strip off the <code>/LBS</code> prefix from the requestURI and forward
+ * the request to the servlet at the resulting requestURI. This forwarding
+ * effectively disables the LBS but still allows requests which target the
+ * LBS to succeed against the webapp on the same host.
*
- * @see #hostTable
+ * @param request
+ * The request.
+ * @param response
+ * The response.
*
- * FIXME This MUST be updated on a periodic basis. We can probably
- * query the gangliaService to figure out how often it gets updates, or
- * we can do this every 5 seconds or so (the ganglia updates are not
- * synchronized across a cluster - they just pour in).
- *
- * TODO For scalability on clusters with a lot of ganglia chatter, we
- * should only keep the data from those hosts that are of interest for
- * a given HA replication cluster. The load on other hosts has no
- * impact on our decision when load balancing within an HA replication
- * cluster.
+ * @throws IOException
+ * @throws ServletException
*/
- private void updateHostsTable() {
+ static protected void forwardToThisService(
+ final HttpServletRequest request, //
+ final HttpServletResponse response//
+ ) throws IOException, ServletException {
- /*
- * Note: If there is more than one service on the same host, then we
- * will have one record per host, not per service.
- *
- * Note: The actual metrics that are available depend on the OS and on
- * whether you are running gmond or having the GangliaPlugIn do its own
- * reporting. The policy that ranks the host reports should be robust to
- * these variations.
- */
- final IHostReport[] hostReport = gangliaService.getHostReport(//
- reportOn,// metrics to be reported.
- comparator// imposes order on the host reports.
- );
+ final String path = request.getRequestURI();
- log.warn("hostReport=" + Arrays.toString(hostReport));
+ // The prefix for the LBS servlet.
+ final String prefix = (String) request.getServletContext()
+ .getAttribute(BigdataServlet.ATTRIBUTE_LBS_PREFIX);
- final HostScore[] scores = new HostScore[hostReport.length];
+ if (prefix == null) {
+ // LBS is not running / destroyed.
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ return;
+ }
- for (int i = 0; i < hostReport.length; i++) {
-
- final IHostReport r = hostReport[i];
+ if (!path.startsWith(prefix)) {
+ // Request should not have reached the LBS.
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ return;
+ }
+ // what remains after we strip off the LBS prefix.
+ final String rest = path.substring(prefix.length());
+
+ // build up path w/o LBS prefix.
+ final StringBuilder uri = new StringBuilder();
+
+ if (!rest.startsWith("/")) {
/*
- * TODO This is ignoring the metrics for the host and weighting all
- * hosts equally.
+ * The new path must start with '/' and is relative to this
+ * ServletContext.
*/
- scores[i++] = new HostScore(r.getHostName(), 1.0,
- (double) hostReport.length);
+ uri.append("/");
+ }
- }
+ // append the remainder of the original requestURI
+ uri.append(rest);
- // sort into ascending order (increasing activity).
- Arrays.sort(scores);
+// // append the query parameters (if any).
+// final String query = request.getQueryString();
+// if (query != null)
+// uri.append("?").append(query);
- for (int i = 0; i < scores.length; i++) {
-
- scores[i].rank = i;
-
- scores[i].drank = ((double) i) / scores.length;
-
- }
-
- if (log.isDebugEnabled()) {
-
- log.debug("The most active index was: " + scores[scores.length - 1]);
-
- log.debug("The least active index was: " + scores[0]);
-
- }
-
- this.hostTable.set(scores);
-
- }
-
- /**
- * Update the per-service table.
- *
- * @see #serviceTable
- *
- * FIXME This MUST be maintained by appropriate watchers such that we
- * just consult the as maintained information and act immediately on
- * it. We can not afford any latency for RMI or even figuring out which
- * the host has the least load. That should all be maintained by a
- * scheduled thread and listeners.
- */
- private void updateServicesTable() {
-
- final ServletContext servletContext = getServletContext();
-
- final HAJournal journal = (HAJournal) BigdataServlet
- .getIndexManager(servletContext);
-
- final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal.getQuorum();
-
+ // The new path.
+ final String newPath = uri.toString();
+
/*
- * Note: This is the *local* HAGlueService.
- *
- * This page must be robust to some new failure modes. The ZooKeeper
- * client can now be associated with an expired session, River discovery
- * can now be disabled, and the HAQuorumService might not be available
- * from quorum.getClient(). All of those things can happen if there is a
- * zookeeper session expiration that forces us to terminate the
- * HAQuorumService. This condition will be cured automatically (unless
- * the service is being shutdown), but only limited status information
- * can be provided while the HAQuorumService is not running.
+ * Forward the request to this servlet container now that we have
+ * stripped off the prefix for the LBS.
*/
- final QuorumService<HAGlue> quorumService;
- {
- QuorumService<HAGlue> t;
- try {
- t = (QuorumService) quorum.getClient();
- } catch (IllegalStateException ex) {
- // Note: Not available (quorum.start() not called).
- return;
- }
- quorumService = t;
- }
- final UUID[] joined = quorum.getJoined();
- final HAGlueScore[] serviceScores = new HAGlueScore[joined.length];
+ if (log.isInfoEnabled())
+ log.info("forward: " + path + " => " + newPath);
- for (int i = 0; i < joined.length; i++) {
- final UUID serviceId = joined[i];
- try {
+ request.getRequestDispatcher(newPath).forward(request, response);
- /*
- * TODO Scan the existing table before doing an RMI to the
- * service. We only need to do the RMI for a new service, not
- * one in the table.
- *
- * TODO A services HashMap<UUID,HAGlueScore> would be much more
- * efficient than a table. If we use a CHM, then we can do this
- * purely asynchronously as the HAGlue services entire the set
- * of joined services.
- */
- serviceScores[i] = new HAGlueScore(servletContext, serviceId);
-
- } catch (RuntimeException ex) {
-
- /*
- * Ignore. Might not be an HAGlue instance.
- */
-
- if (log.isInfoEnabled())
- log.info(ex, ex);
-
- continue;
-
- }
-
- }
-
- this.serviceTable.set(serviceScores);
-
}
-
- /*
- * FIXME Choose among pre-computed and maintained proxy targets based on the
- * LBS policy.
- */
- private static final String _proxyTo = "http://localhost:8091/bigdata";
-
- /**
- * The table of pre-scored hosts.
- *
- * TODO There is an entry for all known hosts, but not all hosts are running
- * service that we care about. So we have to run over the table, filtering
- * for hosts that have services that we care about.
- */
- private final AtomicReference<HostScore[]> hostTable = new AtomicReference<HostScore[]>(
- null);
/**
- * This is the table of known services. We can scan the table for a service
- * {@link UUID} and then forward a request to the pre-computed requestURL
- * associated with that {@link UUID}. If the requestURL is <code>null</code>
- * then we do not know how to reach that service and can not proxy the
- * request.
- */
- private final AtomicReference<HAGlueScore[]> serviceTable = new AtomicReference<HAGlueScore[]>(
- null);
-
- /**
* For update requests, rewrite the requestURL to the service that is the
* quorum leader. For read requests, rewrite the requestURL to the service
* having the least load.
*/
@Override
- protected URI rewriteURI(final HttpServletRequest request)
- {
+ protected URI rewriteURI(final HttpServletRequest request) {
+
final String path = request.getRequestURI();
if (!path.startsWith(prefix))
return null;
@@ -517,10 +518,10 @@
final String proxyTo;
if(isUpdate) {
// Proxy to leader.
- proxyTo = getLeaderURL(request);
+ proxyTo = policy.getLeaderURL(request);
} else {
// Proxy to any joined service.
- proxyTo = getReaderURL(request);
+ proxyTo = policy.getReaderURL(request);
}
if (proxyTo == null) {
// Could not rewrite.
@@ -548,193 +549,1208 @@
}
/**
- * Return <code>true</code> iff this is an UPDATE request that must be
- * proxied to the quorum leader.
- *
- * FIXME How do we identify "UPDATE" requests? DELETE and PUT are update
- * requests, but POST is not always an UPDATE. It can also be used for
- * QUERY. GET is never an UPDATE request, and that is what this is based on
- * right now.
+ * TODO This offers an opportunity to handle a rewrite failure. It could be
+ * used to provide a default status code (e.g., 404 versus forbidden) or to
+ * forward the request to this server rather than proxying to another
+ * server.
*/
- private boolean isUpdateRequest(HttpServletRequest request) {
+ @Override
+ protected void onRewriteFailed(final HttpServletRequest request,
+ final HttpServletResponse response) throws IOException {
- return !request.getMethod().equalsIgnoreCase("GET");
-
+ response.sendError(HttpServletResponse.SC_FORBIDDEN);
+
}
- private String getLeaderURL(final HttpServletRequest request) {
+ /**
+ * Return <code>true</code> iff this is an UPDATE request that must be
+ * proxied to the quorum leader. A SPARQL QUERY
+ */
+ private boolean isUpdateRequest(final HttpServletRequest request) {
- final ServletContext servletContext = getServletContext();
+ final boolean isGet = request.getMethod().equalsIgnoreCase("GET");
- final HAJournal journal = (HAJournal) BigdataServlet
- .getIndexManager(servletContext);
+ if (isGet) {
- final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal.getQuorum();
-
- final UUID leaderId = quorum.getLeaderId();
+ // GET is never an UPDATE request.
+ return false;
- if (leaderId == null) {
- // No quorum, so no leader. Can not proxy the request.
- return null;
}
+
+ final String requestURI = request.getRequestURI();
- /*
- * Scan the services table to locate the leader and then proxy the
- * request to the pre-computed requestURL for the leader. If that
- * requestURL is null then we do not know about a leader and can not
- * proxy the request at this time.
- */
+ if (requestURI.endsWith("/sparql")) {
- final HAGlueScore[] services = serviceTable.get();
-
- if (services == null) {
+ /*
+ * SPARQL end point.
+ *
+ * @see QueryServlet#doPost()
+ */
- // No services. Can't proxy.
- return null;
+ if ( request.getParameter(QueryServlet.ATTR_QUERY) != null ||
+ RESTServlet.hasMimeType(request,
+ BigdataRDFServlet.MIME_SPARQL_QUERY)
+ ) {
- }
+ /*
+ * QUERY against SPARQL end point using POST for visibility, not
+ * mutability.
+ */
- for (HAGlueScore s : services) {
+ return false; // idempotent query using POST.
- if (s.serviceUUID.equals(leaderId)) {
+ }
- // Found it. Proxy if the serviceURL is defined.
- return s.requestURL;
+ if (request.getParameter(QueryServlet.ATTR_UUID) != null) {
+ return false; // UUID request with caching disabled.
+
+ } else if (request.getParameter(QueryServlet.ATTR_ESTCARD) != null) {
+
+ return false; // ESTCARD with caching defeated.
+
+ } else if (request.getParameter(QueryServlet.ATTR_CONTEXTS) != null) {
+
+ // Request for all contexts in the database.
+ return false;
+
}
-
+
}
- // Not found. Won't proxy.
- return null;
-
+ // Anything else must be proxied to the leader.
+ return true;
+
}
+ /** Place into descending order by load_one. */
+ public static class DefaultHostReportComparator extends
+ HostReportComparator implements Comparator<IHostReport> {
+
+ public DefaultHostReportComparator() {
+ super("load_one", true/* asc */);
+ }
+
+ }
+
/**
- * Return the requestURL to which we will proxy a read request.
+ * Abstract base class establishes a listener for quorum events, tracks the
+ * services that are members of the quorum, and caches metadata about those
+ * services (especially the requestURL at which they will respond).
*
- * @param request
- * The request.
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
*
- * @return The proxyTo URL -or- <code>null</code> if we could not find a
- * service to which we could proxy this request.
+ * FIXME The {@link QuorumListener} is unregistered by
+ * {@link AbstractQuorum#terminate()}. This happens any time the
+ * {@link HAJournalServer} goes into the error state. When this
+ * occurs, we stop getting {@link QuorumEvent}s and the policy stops
+ * being responsive. We probably need to either NOT clear the quorum
+ * listener and/or add an event type that is sent when
+ * {@link Quorum#terminate()} is called.
*/
- private String getReaderURL(final HttpServletRequest request) {
+ abstract protected static class AbstractLBSPolicy implements
+ IHALoadBalancerPolicy, QuorumListener {
- final HostScore[] hostScores = this.hostTable.get();
-
- if (hostScores == null) {
- // Can't proxy to anything.
- return null;
+ public interface InitParams {
+
}
- // Choose a host : TODO This is just a round robin over the hosts.
- HostScore hostScore = null;
- for (int i = 0; i < hostScores.length; i++) {
+ /**
+ * The {@link ServletContext#getContextPath()} is cached in
+ * {@link #init(ServletConfig, IIndexManager)}.
+ */
+ private final AtomicReference<String> contextPath = new AtomicReference<String>();
+
+ /**
+ * A {@link WeakReference} to the {@link HAJournal} avoids pinning the
+ * {@link HAJournal}.
+ */
+ protected final AtomicReference<WeakReference<HAJournal>> journalRef = new AtomicReference<WeakReference<HAJournal>>();
- final int hostIndex = (i + nextHost) % hostScores.length;
+ /**
+ * This is the table of known services. We can scan the table for a service
+ * {@link UUID} and then forward a request to the pre-computed requestURL
+ * associated with that {@link UUID}. If the requestURL is <code>null</code>
+ * then we do not know how to reach that service and can not proxy the
+ * request.
+ */
+ protected final AtomicReference<HAGlueScore[]> serviceTable = new AtomicReference<HAGlueScore[]>(
+ null);
- hostScore = hostScores[hostIndex];
+ /**
+ * Return the cached reference to the {@link HAJournal}.
+ *
+ * @return The reference or <code>null</code> iff the reference has been
+ * cleared or has not yet been set.
+ */
+ protected HAJournal getJournal() {
- if (hostScore == null)
- continue;
+ final WeakReference<HAJournal> ref = journalRef.get();
- nextHost = hostIndex + 1;
+ if (ref == null)
+ return null;
+
+ return ref.get();
}
+
+ @Override
+ public void destroy() {
- if (hostScore == null) {
+ contextPath.set(null);
- // No hosts. Can't proxy.
- return null;
+ journalRef.set(null);
+
+ serviceTable.set(null);
}
+
+ @Override
+ public void init(final ServletConfig servletConfig,
+ final IIndexManager indexManager) throws ServletException {
- final HAGlueScore[] services = this.serviceTable.get();
+ final ServletContext servletContext = servletConfig
+ .getServletContext();
- if (services == null) {
+ contextPath.set(servletContext.getContextPath());
- // No services. Can't proxy.
- return null;
+ final HAJournal journal = (HAJournal) BigdataServlet
+ .getIndexManager(servletContext);
+ this.journalRef.set(new WeakReference<HAJournal>(journal));
+
+ final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal
+ .getQuorum();
+
+ quorum.addListener(this);
+
}
-
- /*
- * Find a service on that host.
- *
- * TODO If none found, the try other hosts until we have tried each host
- * once and then give up by returning null. This will require pushing
- * down the service finder into a method that we call from the hosts
- * loop.
+
+ @Override
+ public boolean service(final boolean isUpdate,
+ final HttpServletRequest request,
+ final HttpServletResponse response) throws ServletException,
+ IOException {
+
+ /*
+ * Figure out whether the quorum is met and if this is the quorum
+ * leader.
+ */
+ final HAJournal journal = getJournal();
+ Quorum<HAGlue, QuorumService<HAGlue>> quorum = null;
+ QuorumService<HAGlue> quorumService = null;
+ long token = Quorum.NO_QUORUM; // assume no quorum.
+ boolean isLeader = false; // assume false.
+ boolean isQuorumMet = false; // assume false.
+ if (journal != null) {
+ quorum = journal.getQuorum();
+ if (quorum != null) {
+ try {
+ // Note: This is the *local* HAGlueService.
+ quorumService = (QuorumService) quorum.getClient();
+ token = quorum.token();
+ isLeader = quorumService.isLeader(token);
+ isQuorumMet = token != Quorum.NO_QUORUM;
+ } catch (IllegalStateException ex) {
+ // Note: Not available (quorum.start() not
+ // called).
+ }
+ }
+ }
+
+ if ((isLeader && isUpdate) || !isQuorumMet) {
+
+ /*
+ * (1) If this service is the leader and the request is an
+ * UPDATE, then we forward the request to the local service. It
+ * will handle the UPDATE request.
+ *
+ * (2) If the quorum is not met, then we forward the request to
+ * the local service. It will produce the appropriate error
+ * message.
+ *
+ * FIXME (3) For read-only requests, have a configurable
+ * preference to forward the request to this service unless
+ * either (a) there is a clear load imbalance. This will help to
+ * reduce the latency of the request. If HAProxy is being used
+ * to load balance over the readers, then we should have a high
+ * threshold before we send the request somewhere else.
+ *
+ * @see #forwardToThisService()
+ */
+ forwardToThisService(request, response);
+
+ // request was handled.
+ return true;
+
+ }
+
+ /*
+ * Hook the request to update the service/host tables if they are
+ * not yet defined.
+ */
+ conditionallyUpdateServiceTable();
+
+ // request was not handled.
+ return false;
+
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This implementation rewrites the requestURL such that the request
+ * will be proxied to the quorum leader.
*/
- for(HAGlueScore x : services) {
+ @Override
+ final public String getLeaderURL(final HttpServletRequest request) {
+
+ final ServletContext servletContext = request.getServletContext();
+
+ final HAJournal journal = (HAJournal) BigdataServlet
+ .getIndexManager(servletContext);
+
+ final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal
+ .getQuorum();
+
+ final UUID leaderId = quorum.getLeaderId();
+
+ if (leaderId == null) {
+ // No quorum, so no leader. Can not proxy the request.
+ return null;
+ }
+
+ /*
+ * Scan the services table to locate the leader and then proxy the
+ * request to the pre-computed requestURL for the leader. If that
+ * requestURL is null then we do not know about a leader and can not
+ * proxy the request at this time.
+ */
+
+ final HAGlueScore[] services = serviceTable.get();
- if (x.hostname == null) {
- // Can't use if no hostname.
- continue;
+ if (services == null) {
+
+ // No services. Can't proxy.
+ return null;
+
}
- if (x.requestURL == null) {
- // Can't use if no requestURL.
- continue;
+ for (HAGlueScore s : services) {
+
+ if (s.serviceUUID.equals(leaderId)) {
+
+ // Found it. Proxy if the serviceURL is defined.
+ return s.requestURL;
+
+ }
+
}
+
+ // Not found. Won't proxy.
+ return null;
- if (!x.hostname.equals(hostScore.hostname)) {
- // This service is not on the host we are looking for.
- continue;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * The services table is updated if a services joins or leaves the
+ * quorum.
+ */
+ @Override
+ public void notify(final QuorumEvent e) {
+ switch(e.getEventType()) {
+ case SERVICE_JOIN:
+ case SERVICE_LEAVE:
+ updateServiceTable();
+ break;
}
+ }
+
+ /**
+ * Conditionally update the {@link #serviceTable} iff it does not exist
+ * or is empty.
+ */
+ protected void conditionallyUpdateServiceTable() {
- return x.requestURL;
-
+ final HAGlueScore[] services = serviceTable.get();
+
+ if (services == null || services.length == 0) {
+
+ /*
+ * Ensure that the service table exists (more correctly, attempt
+ * to populate it, but we can only do that if the
+ * HAQuorumService is running.)
+ *
+ * FIXME This should be robust even when the HAQuorumService is
+ * not running. We do not want to be unable to proxy to another
+ * service just because this one is going through an error
+ * state. Would it make more sense to have a 2nd Quorum object
+ * for this purpose - one that is not started and stopped by the
+ * HAJournalServer?
+ *
+ * Note: Synchronization here is used to ensure only one thread
+ * runs this logic if the table does not exist and we get a
+ * barrage of requests.
+ */
+ synchronized (serviceTable) {
+
+ updateServiceTable();
+
+ }
+
+ }
+
}
+
+ /**
+ * Update the per-service table.
+ *
+ * @see #serviceTable
+ */
+ protected void updateServiceTable() {
- // No service found on that host.
- return null;
-
+ final HAJournal journal = getJournal();
+
+ final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal
+ .getQuorum();
+
+ final UUID[] joined = quorum.getJoined();
+
+ final HAGlueScore[] serviceScores = new HAGlueScore[joined.length];
+
+ for (int i = 0; i < joined.length; i++) {
+
+ final UUID serviceId = joined[i];
+
+ try {
+
+ /*
+ * TODO Scan the existing table before doing an RMI to the
+ * service. We only need to do the RMI for a new service,
+ * not one in the table.
+ *
+ * TODO A services HashMap<UUID,HAGlueScore> would be much
+ * more efficient than a table. If we use a CHM, then we can
+ * do this purely asynchronously as the HAGlue services
+ * entire the set of joined services.
+ */
+ serviceScores[i] = new HAGlueScore(journal,
+ contextPath.get(), serviceId);
+
+ } catch (RuntimeException ex) {
+
+ /*
+ * Ignore. Might not be an HAGlue instance.
+ */
+
+ if (log.isInfoEnabled())
+ log.info(ex, ex);
+
+ continue;
+
+ }
+
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Updated servicesTable: #services="
+ + serviceScores.length);
+
+ this.serviceTable.set(serviceScores);
+
+ }
+
}
- int nextHost = 0;
+
+ /**
+ * This policy proxies all requests for update operations to the leader but
+ * forwards read requests to the local service. Thus, it does not provide a
+ * load balancing strategy, but it does allow update requests to be directed
+ * to any service in an non-HA aware manner. This policy can be combined
+ * with an external round-robin strategy to load balance the read-requests
+ * over the cluster.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ *
+ * TODO A service that is not joined with the met quorum can not
+ * answer a read-request. In order to be generally useful (and not
+ * just as a debugging policy), we need to proxy a read-request when
+ * this service is not joined with the met quorum. If there is no
+ * met quorum, then we can just forward the request to the local
+ * service and it will report the NoQuorum error.
+ */
+ public static class NOPLBSPolicy extends AbstractLBSPolicy {
- /** Place into descending order by load_one. */
- public static class DefaultHostReportComparator extends
- HostReportComparator implements Comparator<IHostReport> {
+ @Override
+ public boolean service(final boolean isUpdate,
+ final HttpServletRequest request,
+ final HttpServletResponse response) throws IOException,
+ ServletException {
- public DefaultHostReportComparator() {
- super("load_one", true/* asc */);
+ if (!isUpdate) {
+
+ // Always handle read requests locally.
+ forwardToThisService(request, response);
+
+ // Request was handled.
+ return true;
+
+ }
+
+ // Proxy update requests to the quorum leader.
+ return super.service(isUpdate, request, response);
+
}
+ /**
+ * Note: This method is not invoked.
+ */
+ @Override
+ public String getReaderURL(final HttpServletRequest req) {
+
+ throw new UnsupportedOperationException();
+
+ }
+
}
/**
- * Stochastically proxy the request to the services based on their load.
+ * Policy implements a round-robin over the services that are joined with
+ * the met quorum.
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
*/
- public static class DefaultLBSPolicy implements IHALoadBalancerPolicy {
+ public static class RoundRobinPolicy extends AbstractLBSPolicy {
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This imposes a round-robin policy over the discovered services. If
+ * the service is discovered and appears to be joined with the met
+ * quorum, then the request can be proxied to that service.
+ */
@Override
- public String proxyTo(HttpServletRequest req) {
- // TODO Auto-generated method stub
- return null;
+ public String getReaderURL(final HttpServletRequest request) {
+
+ final HAGlueScore[] serviceScores = this.serviceTable.get();
+
+ if (serviceScores == null) {
+
+ // Nothing discovered. Can't proxy.
+ return null;
+
+ }
+
+ /*
+ * Choose a service.
+ *
+ * Note: This is a round robin over the services. Any service that
+ * is joined with the met quorum can be selected as a target for the
+ * read request.
+ *
+ * Note: The round-robin is atomic with respect to each request. The
+ * request obtains a starting point in the serviceScores[] and then
+ * finds the next service in that array using a round-robin. The
+ * [nextService] starting point is not updated until the round-robin
+ * is complete - this is necessary in order to avoid skipping over
+ * services that are being checked by a concurrent request.
+ *
+ * The [nextService] is updated only after the round-robin decision
+ * has been made. As soon as it has been updated, a new round-robin
+ * decision will be made with respect to the new value for
+ * [nextService] but any in-flight decisions will be made against
+ * the value of [nextService] that they observed on entry.
+ */
+
+ // The starting offset for the round-robin.
+ final long startIndex = nextService.longValue();
+
+ // The selected service.
+ HAGlueScore serviceScore = null;
+
+ for (int i = 0; i < serviceScores.length; i++) {
+
+ /*
+ * Find the next host index.
+ *
+ * Note: We need to ensure that the hostIndex stays in the legal
+ * range, even with concurrent requests and when wrapping around
+ * MAX_VALUE.
+ */
+ final int hostIndex = (int) Math
+ .abs(((i + startIndex) % serviceScores.length));
+
+ serviceScore = serviceScores[hostIndex];
+
+ if (serviceScore == null)
+ continue;
+
+ if (serviceScore.hostname == null) {
+ // Can't use if no hostname.
+ continue;
+ }
+
+ if (serviceScore.requestURL == null) {
+ // Can't use if no requestURL.
+ continue;
+ }
+
+ }
+
+ // Bump the nextService counter.
+ nextService.incrementAndGet();
+
+ if (serviceScore == null) {
+
+ // No service. Can't proxy.
+ return null;
+
+ }
+
+ return serviceScore.requestURL;
+
}
-
+
+ /**
+ * Note: This could be a hot spot. We can have concurrent requests and
+ * we need to increment this counter for each such request.
+ */
+ private final AtomicLong nextService = new AtomicLong(0L);
+
}
/**
- * Always proxy the request to the local service even if it is not HA ready
- * (this policy defeats the load balancer).
+ * Stochastically proxy the request to the services based on their load.
+ * <p>
+ * Note: This {@link IHALoadBalancerPolicy} has a dependency on the
+ * {@link GangliaPlugIn}. The {@link GangliaPlugIn} must be setup to listen
+ * to the Ganglia protocol and build up an in-memory model of the load on
+ * each host. Ganglia must be reporting metrics for each host running an
+ * {@link HAJournalServer} instance. This can be achieved either using the
+ * <code>gmond</code> utility from the ganglia distribution or using the
+ * {@link GangliaPlugIn}.
*
* @author <a href="mailto:tho...@us...">Bryan
* Thompson</a>
*/
- public static class NOPLBSPolicy implements IHALoadBalancerPolicy {
+ public static class GangliaLBSPolicy extends AbstractLBSPolicy {
+ public interface InitParams extends AbstractLBSPolicy.InitParams {
+
+// /**
+// * A {@link Comparator} that places {@link IHostReport}s into a
+// * total ordering from the host with the least load to the host with
+// * the greatest load (optional).
+// */
+// String COMPARATOR = "comparator";
+//
+// String DEFAULT_COMPARATOR = DefaultHostReportComparator.class
+// .getName();
+
+ /**
+ * The {@link IHostScoringRule} that will be used to score the
+ * {@link IHostReport}s. The {@link IHostReport}s are obtained
+ * periodically from the {@link GangliaPlugIn}. The reports reflect
+ * the best local knowledge of the metrics on each of the hosts. The
+ * hosts will each self-report their metrics periodically using the
+ * ganglia protocol.
+ * <p>
+ * The purpose of the scoring rule is to compute a single workload
+ * number based on those host metrics. The resulting scores are then
+ * normalized. Load balancing decisions are made based on those
+ * normalized scores.
+ */
+ String HOST_SCORING_RULE = "hostScoringRule";
+
+ String DEFAULT_HOST_SCORING_RULE = DefaultHostScoringRule.class
+ .getName();
+
+ }
+
+ /**
+ * Interface for scoring the load on a host.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ public interface IHostScoringRule {
+
+ /**
+ * Return a score for the given {@link IHostReport}.
+ *
+ * @param hostReport
+ * The {@link IHostReport}.
+ *
+ * @return The score.
+ */
+ public double getScore(final IHostReport hostReport);
+
+ }
+
+ /**
+ * Returns ONE for each host (all hosts appear to have an equal
+ * workload).
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ */
+ public static class NOPHostScoringRule implements IHostScoringRule {
+
+ @Override
+ public double getScore(final IHostReport hostReport) {
+
+ return 1d;
+
+ }
+
+ }
+
+ /**
+ * Best effort computation of a workload score based on CPU Utilization,
+ * IO Wait, and GC time.
+ * <p>
+ * Note: Not all platforms report all metrics. For example, OSX does not
+ * report IO Wait, which is a key metric for the workload of a database.
+ *
+ * @author <a href="mailto:tho...@us...">Bryan
+ * Thompson</a>
+ *
+ * FIXME GC time is a JVM metric. It will only get reported by
+ * the {@link GangliaPlugIn} if it is setup to self-report that
+ * data. And it may not report it correctly if there is more
+ * than one {@link HAJournalService} per host. It is also
+ * available from /counters and could be exposed as a JMX MBean.
+ */
+ public static class DefaultHostScoringRule implements IHostScoringRule {
+
+ @Override
+ public double getScore(final IHostReport hostReport) {
+
+ final Map<String, IGangliaMetricMessage> metrics = hostReport
+ .getMetrics();
+
+ /*
+ * TODO Use "load_one" if we can't get both "cpu_system" and
+ * "cpu_user".
+ */
+// final double cpu_system;
+// {
+//
+// final IGangliaMetricMessage m = metrics.get("cpu_system");
+//
+// if (m != null)
+// cpu_system = m.getNumericValue().doubleValue();
+// else
+// cpu_system = .25d;
+//
+// }
+//
+// final double cpu_user;
+// {
+//
+// final IGangliaMetricMessage m = metrics.get("cpu_user");
+//
+// if (m != null)
+// cpu_user = m.getNumericValue().doubleValue();
+// else
+// cpu_user = .25d;
+//
+// }
+
+ final double cpu_idle;
+ {
+
+ final IGangliaMetricMessage m = metrics.get("cpu_idle");
+
+ if (m != null)
+ cpu_idle = m.getNumericValue().doubleValue();
+ else
+ cpu_idle = .5d;
+
+ }
+
+ final double cpu_wio;
+ {
+
+ final IGangliaMetricMessage m = metrics.get("cpu_wio");
+
+ if (m != null)
+ cpu_wio = m.getNumericValue().doubleValue();
+ else
+ cpu_wio = .05d;
+
+ }
+
+ final double hostScore = (1d + cpu_wio * 100d)
+ / (1d + cpu_idle);
+
+ return hostScore;
+
+ }
+
+ }
+
+ /**
+ * Place into descending order by load_one.
+ * <p>
+ * Note: We do not rely on the ordering imposed by this comparator.
+ * Instead, we filter the hosts for those that correspond to the joined
+ * services in the met quorum, compute a score for each such host, and
+ * then normalize those scores.
+ */
+ private final Comparator<IHostReport> comparator = new HostReportComparator(
+ "load_one", false/* asc */);
+
+ /**
+ * The ganglia service - it must be configured at least as a listener.
+ */
+ private GangliaService gangliaService;
+
+ /**
+ * The set of metrics that we are requesting in the ganglia host
+ * reports.
+ */
+ private String[] reportOn;
+
+ /**
+ * The {@link Future} of a task that periodically queries the ganglia
+ * peer for its up to date host counters for each discovered host.
+ */
+ private ScheduledFuture<?> scheduledFuture;
+
+ /**
+ * The table of pre-scored hosts.
+ * <P>
+ * Note: There is an entry for all known hosts, but not all hosts are
+ * running services that we care about. This means that we have to run
+ * over the table, filtering for hosts that have services that we care
+ * about.
+ */
+ private final AtomicReference<HostScore[]> hostTable = new AtomicReference<HostScore[]>(
+ null);
+
+ /**
+ * The most recent score for this host.
+ */
+ private final AtomicReference<HostScore> thisHostScore = new AtomicReference<HostScore>();
+
+ /**
+ * The rule used to score the host reports.
+ */
+ private IHostScoringRule scoringRule;
+
+// @SuppressWarnings("unchecked")
@Override
- public String proxyTo(HttpServletRequest req) {
- // TODO Auto-generated method stub
- return null;
+ public void init(final ServletConfig servletConfig,
+ final IIndexManager indexManager) throws ServletException {
+
+ super.init(servletConfig, indexManager);
+
+// comparator = newInstance(servletConfig, Comparator.class,
+// ...
[truncated message content] |