From: <tob...@us...> - 2014-03-07 19:20:51
|
Revision: 7918 http://sourceforge.net/p/bigdata/code/7918 Author: tobycraig Date: 2014-03-07 19:20:47 +0000 (Fri, 07 Mar 2014) Log Message: ----------- #840 - Incremental truth maintenance servlet. Not yet fully working/tested. doClosure causes a hang after the first run. If truth maintenance is not configured, toggle should probably provide a response other than a 500. Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/NanoSparqlServer.java Added Paths: ----------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InferenceServlet.java Added: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InferenceServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InferenceServlet.java (rev 0) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InferenceServlet.java 2014-03-07 19:20:47 UTC (rev 7918) @@ -0,0 +1,128 @@ +/** +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.rdf.sail.webapp; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.log4j.Logger; + +import com.bigdata.rdf.sail.BigdataSail.BigdataSailConnection; +import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; + +/** + * Incremental truth maintenance servlet. Turns incremental truth maintenance + * on and off, and allows database at once closure. + * + * @see <a href="http://trac.bigdata.com/ticket/840">ticket</a> + * + * @author tobycraig + */ +public class InferenceServlet extends BigdataRDFServlet { + + /** + * + */ + private static final long serialVersionUID = 1L; + + static private final transient Logger log = Logger.getLogger(MultiTenancyServlet.class); + + /** + * Set to true or false, this parameter indicates the state that incremental + * truth maintenance should be set to. + */ + private static final String TOGGLE = "toggle"; + + /** + * If present, compute closure. + */ + private static final String DO_CLOSURE= "doClosure"; + + public InferenceServlet() { + + } + + /** + * Handle incremental truth maintenance. + */ + @Override + protected void doPost(final HttpServletRequest req, + final HttpServletResponse resp) throws IOException { + + final boolean toggle = req.getParameter(TOGGLE) != null; + final boolean doClosure= req.getParameter(DO_CLOSURE) != null; + + if(toggle) { + doToggle(req, resp); + } + + if(doClosure) { + doClosure(req, resp); + } + } + + /* + * Set incremental truth maintenance according to request + */ + private void doToggle(final HttpServletRequest req, + final HttpServletResponse resp) throws IOException { + + final String toggle = req.getParameter(TOGGLE); + final String namespace = getNamespace(req); + try { + BigdataSailRepositoryConnection conn = getBigdataRDFContext() + .getUnisolatedConnection(namespace); + BigdataSailConnection sailConn = conn.getSailConnection(); + + if(toggle.equals("true")) { + sailConn.setTruthMaintenance(true); + } else if(toggle.equals("false")) { + sailConn.setTruthMaintenance(false); + } + } catch (Exception e) { + throw new RuntimeException(); + } + + } + + /* + * Set off database at once closure. + */ + private void doClosure(final HttpServletRequest req, + final HttpServletResponse resp) throws IOException { + + final String namespace = getNamespace(req); + try { + BigdataSailRepositoryConnection conn = getBigdataRDFContext() + .getUnisolatedConnection(namespace); + BigdataSailConnection sailConn = conn.getSailConnection(); + sailConn.computeClosure(); + } catch (Exception e) { + throw new RuntimeException(); + } + + } + +} Property changes on: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InferenceServlet.java ___________________________________________________________________ Added: svn:mime-type ## -0,0 +1 ## +text/plain \ No newline at end of property Added: svn:keywords ## -0,0 +1 ## +Id Date Revision Author HeadURL \ No newline at end of property Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/NanoSparqlServer.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/NanoSparqlServer.java 2014-03-07 17:42:40 UTC (rev 7917) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/NanoSparqlServer.java 2014-03-07 19:20:47 UTC (rev 7918) @@ -652,6 +652,10 @@ context.addServlet(new ServletHolder(new MultiTenancyServlet()), "/namespace/*"); + // Incremental truth maintenance + context.addServlet(new ServletHolder(new InferenceServlet()), + "/inference"); + /** * Note: JSP pages for the servlet 2.5 specification add the following * dependencies: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-03-16 23:25:32
|
Revision: 7989 http://sourceforge.net/p/bigdata/code/7989 Author: thompsonbry Date: 2014-03-16 23:25:29 +0000 (Sun, 16 Mar 2014) Log Message: ----------- Workaround for the ability to compile the code in CI. I still need to figure out the underlying problem since we can not write test suites to the servlet 3.0 API until this is resolved and I do not think it will run against the servlet 2.0 JAR. See #624 (HA load balancer) Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataRDFServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -246,7 +246,7 @@ if (timestamp == null) { - return getConfig(req.getServletContext()).timestamp; + return getConfig(getServletContext()).timestamp; } @@ -291,7 +291,7 @@ } // use the default namespace. - return getConfig(req.getServletContext()).namespace; + return getConfig(getServletContext()).namespace; } Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -228,10 +228,11 @@ * * @throws IOException */ - static boolean isWritable(final HttpServletRequest req, - final HttpServletResponse resp) throws IOException { + static boolean isWritable(final ServletContext servletContext, + final HttpServletRequest req, final HttpServletResponse resp) + throws IOException { - if(getConfig(req.getServletContext()).readOnly) { + if (getConfig(servletContext).readOnly) { buildResponse(resp, HTTP_METHOD_NOT_ALLOWED, MIME_TEXT_PLAIN, "Not writable."); @@ -240,8 +241,7 @@ return false; } - final HAStatusEnum haStatus = getHAStatus(getIndexManager(req - .getServletContext())); + final HAStatusEnum haStatus = getHAStatus(getIndexManager(servletContext)); if (haStatus == null) { // No quorum. return true; @@ -270,11 +270,11 @@ * * @throws IOException */ - static boolean isReadable(final HttpServletRequest req, - final HttpServletResponse resp) throws IOException { + static boolean isReadable(final ServletContext ctx, + final HttpServletRequest req, final HttpServletResponse resp) + throws IOException { - final HAStatusEnum haStatus = getHAStatus(getIndexManager(req - .getServletContext())); + final HAStatusEnum haStatus = getHAStatus(getIndexManager(ctx)); if (haStatus == null) { // No quorum. return true; @@ -364,7 +364,8 @@ * * @return The known serviceURIs for this service. */ - static public String[] getServiceURIs(final HttpServletRequest req) { + static public String[] getServiceURIs(final ServletContext servletContext, + final HttpServletRequest req) { // One or more. final List<String> serviceURIs = new LinkedList<String>(); @@ -407,8 +408,8 @@ * where LBS is the prefix of the load balancer servlet. */ { - final String prefix = (String) req.getServletContext() - .getAttribute(ATTRIBUTE_LBS_PREFIX); + final String prefix = (String) servletContext.getAttribute( + ATTRIBUTE_LBS_PREFIX); if (prefix != null) { @@ -421,8 +422,7 @@ // The ContextPath for the webapp. This should be the next thing // in the [uri]. - final String contextPath = req.getServletContext() - .getContextPath(); + final String contextPath = servletContext.getContextPath(); // The index of the end of the ContextPath. final int endContextPath = nextSlash Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/DeleteServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -74,7 +74,7 @@ protected void doDelete(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } @@ -234,7 +234,7 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -563,10 +563,10 @@ } private String getLeaderURL(final HttpServletRequest request) { - - final ServletContext servletContext = request.getServletContext(); - final HAJournal journal = (HAJournal) BigdataServlet + final ServletContext servletContext = getServletContext(); + + final HAJournal journal = (HAJournal) BigdataServlet .getIndexManager(servletContext); final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal.getQuorum(); Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/InsertServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -106,7 +106,7 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/MultiTenancyServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -118,7 +118,7 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } @@ -146,7 +146,7 @@ protected void doDelete(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } @@ -172,7 +172,7 @@ protected void doPut(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } @@ -582,8 +582,8 @@ final BNode aDataSet = g.getValueFactory().createBNode(); // Figure out the service end point(s). - final String[] serviceURI = getServiceURIs(req); - + final String[] serviceURI = getServiceURIs(getServletContext(), req); + final VoID v = new VoID(g, tripleStore, serviceURI, aDataSet); v.describeDataSet(false/* describeStatistics */, Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -258,8 +258,9 @@ } // The serviceURIs for this graph. - final String[] serviceURI = BigdataServlet.getServiceURIs(req); - + final String[] serviceURI = BigdataServlet.getServiceURIs( + getServletContext(), req); + /* * TODO Resolve the SD class name and ctor via a configuration property * for extensible descriptions. @@ -297,7 +298,7 @@ private void doUpdate(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } @@ -397,7 +398,7 @@ void doQuery(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isReadable(req, resp)) { + if (!isReadable(getServletContext(), req, resp)) { // HA Quorum in use, but quorum is not met. return; } @@ -1011,7 +1012,7 @@ private void doEstCard(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isReadable(req, resp)) { + if (!isReadable(getServletContext(), req, resp)) { // HA Quorum in use, but quorum is not met. return; } @@ -1108,7 +1109,7 @@ private void doContexts(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isReadable(req, resp)) { + if (!isReadable(getServletContext(), req, resp)) { // HA Quorum in use, but quorum is not met. return; } @@ -1174,7 +1175,7 @@ private void doShardReport(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isReadable(req, resp)) { + if (!isReadable(getServletContext(), req, resp)) { // HA Quorum in use, but quorum is not met. return; } Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java 2014-03-16 23:09:34 UTC (rev 7988) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/UpdateServlet.java 2014-03-16 23:25:29 UTC (rev 7989) @@ -72,10 +72,10 @@ } @Override - protected void doPut(HttpServletRequest req, HttpServletResponse resp) - throws IOException { + protected void doPut(final HttpServletRequest req, + final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } @@ -348,7 +348,7 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (!isWritable(req, resp)) { + if (!isWritable(getServletContext(), req, resp)) { // Service must be writable. return; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2014-03-19 18:21:19
|
Revision: 8001 http://sourceforge.net/p/bigdata/code/8001 Author: mrpersonick Date: 2014-03-19 18:21:16 +0000 (Wed, 19 Mar 2014) Log Message: ----------- added a helper servlet for the workbench Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java Added Paths: ----------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/WorkbenchServlet.java Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java 2014-03-19 16:44:49 UTC (rev 8000) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java 2014-03-19 18:21:16 UTC (rev 8001) @@ -57,6 +57,8 @@ private InsertServlet m_insertServlet; private DeleteServlet m_deleteServlet; private UpdateServlet m_updateServlet; + private WorkbenchServlet m_workbenchServlet; + /** * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/584"> * DESCRIBE CACHE </a> @@ -80,12 +82,14 @@ m_updateServlet = new UpdateServlet(); m_deleteServlet = new DeleteServlet(); m_describeServlet = new DescribeCacheServlet(); + m_workbenchServlet = new WorkbenchServlet(); m_queryServlet.init(getServletConfig()); m_insertServlet.init(getServletConfig()); m_updateServlet.init(getServletConfig()); m_deleteServlet.init(getServletConfig()); m_describeServlet.init(getServletConfig()); + m_workbenchServlet.init(getServletConfig()); } @@ -120,6 +124,11 @@ m_describeServlet = null; } + if (m_workbenchServlet != null) { + m_workbenchServlet.destroy(); + m_workbenchServlet = null; + } + super.destroy(); } @@ -222,7 +231,11 @@ buildResponse(resp, HTTP_OK, MIME_TEXT_PLAIN); - } else if(req.getParameter("uri") != null) { + } else if (req.getParameter(WorkbenchServlet.ATTR_WORKBENCH) != null) { + + m_workbenchServlet.doPost(req, resp); + + } else if (req.getParameter("uri") != null) { // INSERT via w/ URIs m_insertServlet.doPost(req, resp); Added: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/WorkbenchServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/WorkbenchServlet.java (rev 0) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/WorkbenchServlet.java 2014-03-19 18:21:16 UTC (rev 8001) @@ -0,0 +1,183 @@ +/** +Copyright (C) SYSTAP, LLC 2006-2014. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +*/ +package com.bigdata.rdf.sail.webapp; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.log4j.Logger; +import org.openrdf.model.Graph; +import org.openrdf.model.impl.GraphImpl; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.RDFParserFactory; +import org.openrdf.rio.RDFParserRegistry; +import org.openrdf.rio.helpers.StatementCollector; + +import com.bigdata.rdf.sail.webapp.client.MiniMime; +import com.bigdata.rdf.store.AbstractTripleStore; + +/** + * Helper servlet for workbench requests. + */ +public class WorkbenchServlet extends BigdataRDFServlet { + + /** + * + */ + private static final long serialVersionUID = 1L; + + static private final transient Logger log = Logger.getLogger(WorkbenchServlet.class); + + /** + * Flag to signify a workbench operation. + */ + static final transient String ATTR_WORKBENCH = "workbench"; + + /** + * Flag to signify a convert operation. POST an RDF document with a + * content type and an accept header for what it should be converted to. + */ + static final transient String ATTR_CONVERT = "convert"; + + + public WorkbenchServlet() { + + } + + @Override + protected void doPost(final HttpServletRequest req, + final HttpServletResponse resp) throws IOException { + + + if (req.getParameter(ATTR_CONVERT) != null) { + + // Convert from one format to another + doConvert(req, resp); + + } + + } + + /** + * Convert RDF data from one format to another. + */ + private void doConvert(final HttpServletRequest req, + final HttpServletResponse resp) throws IOException { + + final String baseURI = req.getRequestURL().toString(); + + final String namespace = getNamespace(req); + + final long timestamp = getTimestamp(req); + + final AbstractTripleStore tripleStore = getBigdataRDFContext() + .getTripleStore(namespace, timestamp); + + if (tripleStore == null) { + /* + * There is no such triple/quad store instance. + */ + buildResponse(resp, HTTP_NOTFOUND, MIME_TEXT_PLAIN); + return; + } + + final String contentType = req.getContentType(); + + if (log.isInfoEnabled()) + log.info("Request body: " + contentType); + + /** + * <a href="https://sourceforge.net/apps/trac/bigdata/ticket/620"> + * UpdateServlet fails to parse MIMEType when doing conneg. </a> + */ + + final RDFFormat requestBodyFormat = RDFFormat.forMIMEType(new MiniMime( + contentType).getMimeType()); + + if (requestBodyFormat == null) { + + buildResponse(resp, HTTP_BADREQUEST, MIME_TEXT_PLAIN, + "Content-Type not recognized as RDF: " + contentType); + + return; + + } + + final RDFParserFactory rdfParserFactory = RDFParserRegistry + .getInstance().get(requestBodyFormat); + + if (rdfParserFactory == null) { + + buildResponse(resp, HTTP_INTERNALERROR, MIME_TEXT_PLAIN, + "Parser factory not found: Content-Type=" + + contentType + ", format=" + requestBodyFormat); + + return; + + } + +// final String s= IOUtil.readString(req.getInputStream()); +// System.err.println(s); + + final Graph g = new GraphImpl(); + + try { + + /* + * There is a request body, so let's try and parse it. + */ + + final RDFParser rdfParser = rdfParserFactory + .getParser(); + + rdfParser.setValueFactory(tripleStore.getValueFactory()); + + rdfParser.setVerifyData(true); + + rdfParser.setStopAtFirstError(true); + + rdfParser + .setDatatypeHandling(RDFParser.DatatypeHandling.IGNORE); + + rdfParser.setRDFHandler(new StatementCollector(g)); + + /* + * Run the parser, which will cause statements to be + * inserted. + */ + rdfParser.parse(req.getInputStream(), baseURI); + + sendGraph(req, resp, g); + + } catch (Throwable t) { + + throw BigdataRDFServlet.launderThrowable(t, resp, null); + + } + + } + +} Property changes on: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/WorkbenchServlet.java ___________________________________________________________________ Added: svn:mime-type ## -0,0 +1 ## +text/plain \ No newline at end of property This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-04-21 14:33:59
|
Revision: 8130 http://sourceforge.net/p/bigdata/code/8130 Author: thompsonbry Date: 2014-04-21 14:33:56 +0000 (Mon, 21 Apr 2014) Log Message: ----------- Added logging to RESTServlet. Javadoc inside of QueryServlet. Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java 2014-04-21 14:33:06 UTC (rev 8129) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/QueryServlet.java 2014-04-21 14:33:56 UTC (rev 8130) @@ -148,6 +148,11 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { + /* + * Note: HALoadBalancerServlet MUST be maintained if idempotent methods + * are added to doPost() in order to ensure that they are load balanced + * rather than always directed to the quorum leader. + */ if (req.getParameter(ATTR_UPDATE) != null) { Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java 2014-04-21 14:33:06 UTC (rev 8129) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/RESTServlet.java 2014-04-21 14:33:56 UTC (rev 8130) @@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.log4j.Logger; import org.openrdf.model.URI; import org.openrdf.model.impl.URIImpl; @@ -42,8 +43,8 @@ */ public class RESTServlet extends BigdataRDFServlet { -// private static final transient Logger log = Logger -// .getLogger(RESTServlet.class); + private static final transient Logger log = Logger + .getLogger(RESTServlet.class); /** * @@ -137,6 +138,9 @@ protected void doGet(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { + if (log.isInfoEnabled()) + log.info(req.toString()); + /* * Look for linked data GET requests. * @@ -201,7 +205,10 @@ protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { - if (req.getParameter(QueryServlet.ATTR_QUERY) != null + if (log.isInfoEnabled()) + log.info(req.toString()); + + if (req.getParameter(QueryServlet.ATTR_QUERY) != null || req.getParameter(QueryServlet.ATTR_UPDATE) != null || req.getParameter(QueryServlet.ATTR_UUID) != null || req.getParameter(QueryServlet.ATTR_ESTCARD) != null @@ -249,9 +256,14 @@ } - static boolean hasMimeType(final HttpServletRequest req, String mimeType) { - String contentType = req.getContentType(); - return contentType != null && mimeType.equals(new MiniMime(contentType).getMimeType()); + static boolean hasMimeType(final HttpServletRequest req, + final String mimeType) { + + final String contentType = req.getContentType(); + + return contentType != null + && mimeType.equals(new MiniMime(contentType).getMimeType()); + } /** @@ -264,6 +276,9 @@ protected void doPut(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { + if (log.isInfoEnabled()) + log.info(req.toString()); + m_updateServlet.doPut(req, resp); } @@ -275,6 +290,9 @@ protected void doDelete(final HttpServletRequest req, final HttpServletResponse resp) throws IOException { + if (log.isInfoEnabled()) + log.info(req.toString()); + m_deleteServlet.doDelete(req, resp); } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2014-04-21 19:15:20
|
Revision: 8132 http://sourceforge.net/p/bigdata/code/8132 Author: thompsonbry Date: 2014-04-21 19:15:16 +0000 (Mon, 21 Apr 2014) Log Message: ----------- Updated version of the HALoadBalancer. See #624 (HA Load Balancer). Modified Paths: -------------- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/IHALoadBalancerPolicy.java Modified: branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java =================================================================== --- branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java 2014-04-21 16:35:35 UTC (rev 8131) +++ branches/RDR/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HALoadBalancerServlet.java 2014-04-21 19:15:16 UTC (rev 8132) @@ -23,10 +23,17 @@ package com.bigdata.rdf.sail.webapp; import java.io.IOException; +import java.lang.ref.WeakReference; import java.net.URI; import java.util.Arrays; import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletConfig; @@ -38,8 +45,10 @@ import org.apache.log4j.Logger; import org.eclipse.jetty.proxy.ProxyServlet; +import com.bigdata.counters.AbstractStatisticsCollector; import com.bigdata.ganglia.GangliaService; import com.bigdata.ganglia.HostReportComparator; +import com.bigdata.ganglia.IGangliaMetricMessage; import com.bigdata.ganglia.IHostReport; import com.bigdata.ha.HAGlue; import com.bigdata.ha.QuorumService; @@ -47,46 +56,61 @@ import com.bigdata.journal.IIndexManager; import com.bigdata.journal.PlatformStatsPlugIn; import com.bigdata.journal.jini.ha.HAJournal; +import com.bigdata.journal.jini.ha.HAJournalServer; +import com.bigdata.quorum.AbstractQuorum; import com.bigdata.quorum.Quorum; +import com.bigdata.quorum.QuorumEvent; +import com.bigdata.quorum.QuorumListener; +import com.bigdata.util.InnerCause; +import com.sun.corba.se.impl.orbutil.closure.Future; /** - * - The HA Load Balancer servlet provides a transparent proxy for requests + * The HA Load Balancer servlet provides a transparent proxy for requests * arriving its configured URL pattern (the "external" interface for the load * balancer) to the root of the web application. - * <P> - * The use of the load balancer is entirely optional. If the security rules - * permit, then clients MAY make requests directly against a specific service. - * Thus, no specific provision exists to disable the load balancer servlet, but - * you may choose not to deploy it. * <p> * When successfully deployed, requests having prefix corresponding to the URL * pattern for the load balancer (typically, "/bigdata/LBS/*") are automatically * redirected to a joined service in the met quorum based on the configured load * balancer policy. * <p> + * The use of the load balancer is entirely optional. If the load balancer is + * not properly configured, then it will simply rewrite itself out of any + * request and the request will be handled by the host to which it was directed + * (no proxying). + * <p> + * Note: If the security rules permit, then clients MAY make requests directly + * against a specific service. + * <p> * The load balancer policies are "HA aware." They will always redirect update - * requests to the quorum leader. The default polices will load balance read - * requests over the leader and followers in a manner that reflects the CPU, IO - * Wait, and GC Time associated with each service. The PlatformStatsPlugIn and - * GangliaPlugIn MUST be enabled for the default load balancer policy to - * operate. It depends on those plugins to maintain a model of the load on the - * HA replication cluster. The GangliaPlugIn should be run only as a listener if - * you are are running the real gmond process on the host. If you are not - * running gmond, then the GangliaPlugIn should be configured as both a listener - * and a sender. + * requests to the quorum leader. Read requests will be directed to one of the + * services that is joined with the met quorum. + * <p> * + * <h3>Default Load Balancer Policy Configuration</h3> + * <p> + * The default policy will load balance read requests over the leader and + * followers in a manner that reflects the CPU, IO Wait, and GC Time associated + * with each service. + * <p> + * The {@link PlatformStatsPlugIn}\xCA and {@link GangliaPlugIn} MUST be enabled + * for the default load balancer policy to operate. It depends on those plugins + * to maintain a model of the load on the HA replication cluster. The + * GangliaPlugIn should be run only as a listener if you are are running the + * real gmond process on the host. If you are not running gmond, then the + * {@link GangliaPlugIn} should be configured as both a listener and a sender. + * <p> + * <ul> + * <li>The {@link PlatformStatsPlugIn} must be enabled.</li>. + * <li>The {@link GangliaPlugIn} must be enabled. The service does not need to + * be enabled for {@link GangliaPlugIn.Options#GANGLIA_REPORT}, but it must be + * enabled for {@link GangliaPlugIn.Options#GANGLIA_LISTEN}. + * </ul> + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * * @see <a href="http://trac.bigdata.com/ticket/624"> HA Load Balancer </a> * - * TODO Define some interesting load balancer policies. We can start with - * HA aware round robin and an HA aware policy that is load balanced based - * on the ganglia reported metrics model. - * - * All policies must be HA aware - we only want to send requests to - * services that are joined with the met quorum. - * * TODO If the target service winds up not joined with the met quorum by * the time we get there, what should it do? Report an error since we are * already on its internal interface? Will this servlet see that error? If @@ -127,96 +151,132 @@ */ String POLICY = "policy"; - String DEFAULT_POLICY = DefaultLBSPolicy.class.getName(); - /** - * A {@link Comparator} that places {@link IHostReport}s into a total - * ordering from the host with the least load to the host with the - * greatest load (optional). + * FIXME The default must be something that we can override from the + * test suite in order to test these different policies. I've added some + * code to do this based on a System property, but the test suite does + * not allow us to set arbitrary system properties on the child + * processes so that code is not yet having the desired effect. */ - String COMPARATOR = "comparator"; - - String DEFAULT_COMPARATOR = DefaultHostReportComparator.class.getName(); - +// String DEFAULT_POLICY = RoundRobinPolicy.class.getName(); + String DEFAULT_POLICY = NOPLBSPolicy.class.getName(); +// String DEFAULT_POLICY = GangliaLBSPolicy.class.getName(); + } public HALoadBalancerServlet() { super(); } +// /** +// * This servlet request attribute is used to mark a request as either an +// * update or a read-only operation. +// */ +// protected static final String ATTR_LBS_UPDATE_REQUEST = "lbs-update-request"; + + /** + * If the LBS is not enabled, then it will strip its prefix from the URL + * requestURI and do a servlet forward to the resulting requestURI. This + * allows the webapp to start even if the LBS is not correctly configured. + */ private boolean enabled = false; private String prefix = null; private IHALoadBalancerPolicy policy; - private Comparator<IHostReport> comparator; - private GangliaService gangliaService; - private String[] reportOn; - @SuppressWarnings("unchecked") @Override public void init() throws ServletException { super.init(); + // Disabled by default. + enabled = false; + final ServletConfig servletConfig = getServletConfig(); final ServletContext servletContext = servletConfig.getServletContext(); - prefix = servletConfig.getInitParameter(InitParams.PREFIX); - - policy = newInstance(servletConfig, IHALoadBalancerPolicy.class, - InitParams.POLICY, InitParams.DEFAULT_POLICY); - - comparator = newInstance(servletConfig, Comparator.class, - InitParams.COMPARATOR, InitParams.DEFAULT_COMPARATOR); - final IIndexManager indexManager = BigdataServlet .getIndexManager(servletContext); if (!(indexManager instanceof HAJournal)) { - throw new ServletException("Not HA"); + // This is not an error, but the LBS is only for HA. + log.warn("Not HA"); + return; } - final HAJournal journal = (HAJournal) indexManager; + prefix = servletConfig.getInitParameter(InitParams.PREFIX); - if (journal.getPlatformStatisticsCollector() == null) { - throw new ServletException("LBS requires " - + PlatformStatsPlugIn.class.getName()); - } + policy = newInstance(servletConfig, IHALoadBalancerPolicy.class, + InitParams.POLICY, InitParams.DEFAULT_POLICY); - gangliaService = (GangliaService) journal.getGangliaService(); + try { - if (gangliaService == null) { - throw new ServletException("LBS requires " - + GangliaPlugIn.class.getName()); + // Attempt to provision the specified LBS policy. + policy.init(servletConfig, indexManager); + + } catch (Throwable t) { + + /* + * The specified LBS policy could not be provisioned. + */ + + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + // Interrupted. + return; + } + + log.error("Could not setup policy: " + policy, t); + + try { + policy.destroy(); + } catch (Throwable t2) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + // Interrupted. + return; + } + log.warn("Problem destroying policy: " + policy, t2); + } finally { + policy = null; + } + + /* + * Fall back onto a NOP policy. Each service will handle a + * read-request itself. Write requests are proxied to the quorum + * leader. + */ + + policy = new NOPLBSPolicy(); + + log.warn("Falling back: policy=" + policy); + + // Initialize the fallback policy. + policy.init(servletConfig, indexManager); + } - reportOn = gangliaService.getDefaultHostReportOn(); - enabled = true; servletContext.setAttribute(BigdataServlet.ATTRIBUTE_LBS_PREFIX, prefix); if (log.isInfoEnabled()) - log.info(servletConfig.getServletName() + " @ " + prefix); + log.info(servletConfig.getServletName() + " @ " + prefix + + " :: policy=" + policy); } @Override public void destroy() { - + enabled = false; prefix = null; - policy = null; + if (policy != null) { + policy.destroy(); + policy = null; + } - comparator = null; - - reportOn = null; - - gangliaService = null; - getServletContext().setAttribute(BigdataServlet.ATTRIBUTE_LBS_PREFIX, null); @@ -225,7 +285,55 @@ } /** + * Return the configured value of the named parameter. This method checks + * the environment variables first for a fully qualified value for the + * parameter using <code>HALoadBalancerServer</code><i>name</i>. If no value + * is found for that variable, it checks the {@link ServletContext} for + * <i>name</i>. If no value is found again, it returns the default value + * specified by the caller. This makes it possible to configure the behavior + * of the {@link HALoadBalancerServlet} using environment variables. + * + * @param servletConfig + * The {@link ServletConfig}. + * + * @param iface + * The interface that the type must implement. + * @param name + * The name of the servlet init parameter. + * @param def + * The default value for the servlet init parameter. + * @return + */ + private static String getConfigParam(final ServletConfig servletConfig, + final String name, final String def) { + + // Look at environment variables for an override. + String s = System.getProperty(HALoadBalancerServlet.class.getName() + + "." + name); + + if (s == null || s.trim().length() == 0) { + + // Look at ServletConfig for the configured value. + s = servletConfig.getInitParameter(name); + + } + + if (s == null || s.trim().length() == 0) { + + // Use the default value. + s = def; + + } + + return s; + + } + + /** * Create an instance of some type based on the servlet init parameters. + * <p> + * Note: The configuration parameter MAY also be specified as <code> + * com.bigdata.rdf.sail.webapp.HALoadBalancerServlet.<i>name</i></code>. * * @param servletConfig * The {@link ServletConfig}. @@ -246,16 +354,9 @@ final Class<? extends T> iface, final String name, final String def) throws ServletException { + final String s = getConfigParam(servletConfig, name, def); + final T t; - - String s = servletConfig.getInitParameter(name); - - if (s == null || s.trim().length() == 0) { - - s = def; - - } - final Class<? extends T> cls; try { cls = (Class<? extends T>) Class.forName(s); @@ -285,230 +386,130 @@ IOException { if (!enabled) { - // The LBS is not available. - response.sendError(HttpServletResponse.SC_NOT_FOUND); + /* + * LBS is disabled. Strip LBS prefix from the requestURI and forward + * the request to servlet on this host (NOP LBS). + */ + forwardToThisService(request, response); + return; } - - final HostScore[] hosts = hostTable.get(); - if (hosts == null || hosts.length == 0) { + /* + * Decide whether this is a read-only request or an update request. + */ + final boolean isUpdate = isUpdateRequest(request); - // Ensure that the host table exists. - updateHostsTable(); +// // Set the request attribute. +// request.setAttribute(ATTR_LBS_UPDATE_REQUEST, isUpdate); + + /* + * Delegate to policy. This provides a single point during which the + * policy can ensure that it is monitoring any necessary information and + * also provides an opportunity to override the behavior completely. For + * example, as an optimization, the policy can forward the request to a + * servlet in this servlet container rather than proxying it to either + * itself or another service. + */ + if(policy.service(isUpdate, request, response)) { - } - - final HAGlueScore[] services = serviceTable.get(); - - if (services == null || services.length == 0) { - - /* - * Ensure that the service table exists (more correctly, attempt to - * populate it, but we can only do that if the HAQuorumService is - * running.) - */ - - updateServicesTable(); + // Return immediately if the response was committed. + return; } - + /* * TODO if rewriteURL() returns null, then the base class (ProxyServlet) * returns SC_FORBIDDEN. It should return something less ominous, like a - * 404. With an explanation. Or a RETRY. + * 404. With an explanation. Or a RETRY. Or just forward to the local + * service and let it report an appropriate error message (e.g., + * NotReady). */ super.service(request, response); } /** - * Update the per-host scoring table. + * Strip off the <code>/LBS</code> prefix from the requestURI and forward + * the request to the servlet at the resulting requestURI. This forwarding + * effectively disables the LBS but still allows requests which target the + * LBS to succeed against the webapp on the same host. * - * @see #hostTable + * @param request + * The request. + * @param response + * The response. * - * FIXME This MUST be updated on a periodic basis. We can probably - * query the gangliaService to figure out how often it gets updates, or - * we can do this every 5 seconds or so (the ganglia updates are not - * synchronized across a cluster - they just pour in). - * - * TODO For scalability on clusters with a lot of ganglia chatter, we - * should only keep the data from those hosts that are of interest for - * a given HA replication cluster. The load on other hosts has no - * impact on our decision when load balancing within an HA replication - * cluster. + * @throws IOException + * @throws ServletException */ - private void updateHostsTable() { + static protected void forwardToThisService( + final HttpServletRequest request, // + final HttpServletResponse response// + ) throws IOException, ServletException { - /* - * Note: If there is more than one service on the same host, then we - * will have one record per host, not per service. - * - * Note: The actual metrics that are available depend on the OS and on - * whether you are running gmond or having the GangliaPlugIn do its own - * reporting. The policy that ranks the host reports should be robust to - * these variations. - */ - final IHostReport[] hostReport = gangliaService.getHostReport(// - reportOn,// metrics to be reported. - comparator// imposes order on the host reports. - ); + final String path = request.getRequestURI(); - log.warn("hostReport=" + Arrays.toString(hostReport)); + // The prefix for the LBS servlet. + final String prefix = (String) request.getServletContext() + .getAttribute(BigdataServlet.ATTRIBUTE_LBS_PREFIX); - final HostScore[] scores = new HostScore[hostReport.length]; + if (prefix == null) { + // LBS is not running / destroyed. + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + return; + } - for (int i = 0; i < hostReport.length; i++) { - - final IHostReport r = hostReport[i]; + if (!path.startsWith(prefix)) { + // Request should not have reached the LBS. + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); + return; + } + // what remains after we strip off the LBS prefix. + final String rest = path.substring(prefix.length()); + + // build up path w/o LBS prefix. + final StringBuilder uri = new StringBuilder(); + + if (!rest.startsWith("/")) { /* - * TODO This is ignoring the metrics for the host and weighting all - * hosts equally. + * The new path must start with '/' and is relative to this + * ServletContext. */ - scores[i++] = new HostScore(r.getHostName(), 1.0, - (double) hostReport.length); + uri.append("/"); + } - } + // append the remainder of the original requestURI + uri.append(rest); - // sort into ascending order (increasing activity). - Arrays.sort(scores); +// // append the query parameters (if any). +// final String query = request.getQueryString(); +// if (query != null) +// uri.append("?").append(query); - for (int i = 0; i < scores.length; i++) { - - scores[i].rank = i; - - scores[i].drank = ((double) i) / scores.length; - - } - - if (log.isDebugEnabled()) { - - log.debug("The most active index was: " + scores[scores.length - 1]); - - log.debug("The least active index was: " + scores[0]); - - } - - this.hostTable.set(scores); - - } - - /** - * Update the per-service table. - * - * @see #serviceTable - * - * FIXME This MUST be maintained by appropriate watchers such that we - * just consult the as maintained information and act immediately on - * it. We can not afford any latency for RMI or even figuring out which - * the host has the least load. That should all be maintained by a - * scheduled thread and listeners. - */ - private void updateServicesTable() { - - final ServletContext servletContext = getServletContext(); - - final HAJournal journal = (HAJournal) BigdataServlet - .getIndexManager(servletContext); - - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal.getQuorum(); - + // The new path. + final String newPath = uri.toString(); + /* - * Note: This is the *local* HAGlueService. - * - * This page must be robust to some new failure modes. The ZooKeeper - * client can now be associated with an expired session, River discovery - * can now be disabled, and the HAQuorumService might not be available - * from quorum.getClient(). All of those things can happen if there is a - * zookeeper session expiration that forces us to terminate the - * HAQuorumService. This condition will be cured automatically (unless - * the service is being shutdown), but only limited status information - * can be provided while the HAQuorumService is not running. + * Forward the request to this servlet container now that we have + * stripped off the prefix for the LBS. */ - final QuorumService<HAGlue> quorumService; - { - QuorumService<HAGlue> t; - try { - t = (QuorumService) quorum.getClient(); - } catch (IllegalStateException ex) { - // Note: Not available (quorum.start() not called). - return; - } - quorumService = t; - } - final UUID[] joined = quorum.getJoined(); - final HAGlueScore[] serviceScores = new HAGlueScore[joined.length]; + if (log.isInfoEnabled()) + log.info("forward: " + path + " => " + newPath); - for (int i = 0; i < joined.length; i++) { - final UUID serviceId = joined[i]; - try { + request.getRequestDispatcher(newPath).forward(request, response); - /* - * TODO Scan the existing table before doing an RMI to the - * service. We only need to do the RMI for a new service, not - * one in the table. - * - * TODO A services HashMap<UUID,HAGlueScore> would be much more - * efficient than a table. If we use a CHM, then we can do this - * purely asynchronously as the HAGlue services entire the set - * of joined services. - */ - serviceScores[i] = new HAGlueScore(servletContext, serviceId); - - } catch (RuntimeException ex) { - - /* - * Ignore. Might not be an HAGlue instance. - */ - - if (log.isInfoEnabled()) - log.info(ex, ex); - - continue; - - } - - } - - this.serviceTable.set(serviceScores); - } - - /* - * FIXME Choose among pre-computed and maintained proxy targets based on the - * LBS policy. - */ - private static final String _proxyTo = "http://localhost:8091/bigdata"; - - /** - * The table of pre-scored hosts. - * - * TODO There is an entry for all known hosts, but not all hosts are running - * service that we care about. So we have to run over the table, filtering - * for hosts that have services that we care about. - */ - private final AtomicReference<HostScore[]> hostTable = new AtomicReference<HostScore[]>( - null); /** - * This is the table of known services. We can scan the table for a service - * {@link UUID} and then forward a request to the pre-computed requestURL - * associated with that {@link UUID}. If the requestURL is <code>null</code> - * then we do not know how to reach that service and can not proxy the - * request. - */ - private final AtomicReference<HAGlueScore[]> serviceTable = new AtomicReference<HAGlueScore[]>( - null); - - /** * For update requests, rewrite the requestURL to the service that is the * quorum leader. For read requests, rewrite the requestURL to the service * having the least load. */ @Override - protected URI rewriteURI(final HttpServletRequest request) - { + protected URI rewriteURI(final HttpServletRequest request) { + final String path = request.getRequestURI(); if (!path.startsWith(prefix)) return null; @@ -517,10 +518,10 @@ final String proxyTo; if(isUpdate) { // Proxy to leader. - proxyTo = getLeaderURL(request); + proxyTo = policy.getLeaderURL(request); } else { // Proxy to any joined service. - proxyTo = getReaderURL(request); + proxyTo = policy.getReaderURL(request); } if (proxyTo == null) { // Could not rewrite. @@ -548,193 +549,1208 @@ } /** - * Return <code>true</code> iff this is an UPDATE request that must be - * proxied to the quorum leader. - * - * FIXME How do we identify "UPDATE" requests? DELETE and PUT are update - * requests, but POST is not always an UPDATE. It can also be used for - * QUERY. GET is never an UPDATE request, and that is what this is based on - * right now. + * TODO This offers an opportunity to handle a rewrite failure. It could be + * used to provide a default status code (e.g., 404 versus forbidden) or to + * forward the request to this server rather than proxying to another + * server. */ - private boolean isUpdateRequest(HttpServletRequest request) { + @Override + protected void onRewriteFailed(final HttpServletRequest request, + final HttpServletResponse response) throws IOException { - return !request.getMethod().equalsIgnoreCase("GET"); - + response.sendError(HttpServletResponse.SC_FORBIDDEN); + } - private String getLeaderURL(final HttpServletRequest request) { + /** + * Return <code>true</code> iff this is an UPDATE request that must be + * proxied to the quorum leader. A SPARQL QUERY + */ + private boolean isUpdateRequest(final HttpServletRequest request) { - final ServletContext servletContext = getServletContext(); + final boolean isGet = request.getMethod().equalsIgnoreCase("GET"); - final HAJournal journal = (HAJournal) BigdataServlet - .getIndexManager(servletContext); + if (isGet) { - final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal.getQuorum(); - - final UUID leaderId = quorum.getLeaderId(); + // GET is never an UPDATE request. + return false; - if (leaderId == null) { - // No quorum, so no leader. Can not proxy the request. - return null; } + + final String requestURI = request.getRequestURI(); - /* - * Scan the services table to locate the leader and then proxy the - * request to the pre-computed requestURL for the leader. If that - * requestURL is null then we do not know about a leader and can not - * proxy the request at this time. - */ + if (requestURI.endsWith("/sparql")) { - final HAGlueScore[] services = serviceTable.get(); - - if (services == null) { + /* + * SPARQL end point. + * + * @see QueryServlet#doPost() + */ - // No services. Can't proxy. - return null; + if ( request.getParameter(QueryServlet.ATTR_QUERY) != null || + RESTServlet.hasMimeType(request, + BigdataRDFServlet.MIME_SPARQL_QUERY) + ) { - } + /* + * QUERY against SPARQL end point using POST for visibility, not + * mutability. + */ - for (HAGlueScore s : services) { + return false; // idempotent query using POST. - if (s.serviceUUID.equals(leaderId)) { + } - // Found it. Proxy if the serviceURL is defined. - return s.requestURL; + if (request.getParameter(QueryServlet.ATTR_UUID) != null) { + return false; // UUID request with caching disabled. + + } else if (request.getParameter(QueryServlet.ATTR_ESTCARD) != null) { + + return false; // ESTCARD with caching defeated. + + } else if (request.getParameter(QueryServlet.ATTR_CONTEXTS) != null) { + + // Request for all contexts in the database. + return false; + } - + } - // Not found. Won't proxy. - return null; - + // Anything else must be proxied to the leader. + return true; + } + /** Place into descending order by load_one. */ + public static class DefaultHostReportComparator extends + HostReportComparator implements Comparator<IHostReport> { + + public DefaultHostReportComparator() { + super("load_one", true/* asc */); + } + + } + /** - * Return the requestURL to which we will proxy a read request. + * Abstract base class establishes a listener for quorum events, tracks the + * services that are members of the quorum, and caches metadata about those + * services (especially the requestURL at which they will respond). * - * @param request - * The request. + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> * - * @return The proxyTo URL -or- <code>null</code> if we could not find a - * service to which we could proxy this request. + * FIXME The {@link QuorumListener} is unregistered by + * {@link AbstractQuorum#terminate()}. This happens any time the + * {@link HAJournalServer} goes into the error state. When this + * occurs, we stop getting {@link QuorumEvent}s and the policy stops + * being responsive. We probably need to either NOT clear the quorum + * listener and/or add an event type that is sent when + * {@link Quorum#terminate()} is called. */ - private String getReaderURL(final HttpServletRequest request) { + abstract protected static class AbstractLBSPolicy implements + IHALoadBalancerPolicy, QuorumListener { - final HostScore[] hostScores = this.hostTable.get(); - - if (hostScores == null) { - // Can't proxy to anything. - return null; + public interface InitParams { + } - // Choose a host : TODO This is just a round robin over the hosts. - HostScore hostScore = null; - for (int i = 0; i < hostScores.length; i++) { + /** + * The {@link ServletContext#getContextPath()} is cached in + * {@link #init(ServletConfig, IIndexManager)}. + */ + private final AtomicReference<String> contextPath = new AtomicReference<String>(); + + /** + * A {@link WeakReference} to the {@link HAJournal} avoids pinning the + * {@link HAJournal}. + */ + protected final AtomicReference<WeakReference<HAJournal>> journalRef = new AtomicReference<WeakReference<HAJournal>>(); - final int hostIndex = (i + nextHost) % hostScores.length; + /** + * This is the table of known services. We can scan the table for a service + * {@link UUID} and then forward a request to the pre-computed requestURL + * associated with that {@link UUID}. If the requestURL is <code>null</code> + * then we do not know how to reach that service and can not proxy the + * request. + */ + protected final AtomicReference<HAGlueScore[]> serviceTable = new AtomicReference<HAGlueScore[]>( + null); - hostScore = hostScores[hostIndex]; + /** + * Return the cached reference to the {@link HAJournal}. + * + * @return The reference or <code>null</code> iff the reference has been + * cleared or has not yet been set. + */ + protected HAJournal getJournal() { - if (hostScore == null) - continue; + final WeakReference<HAJournal> ref = journalRef.get(); - nextHost = hostIndex + 1; + if (ref == null) + return null; + + return ref.get(); } + + @Override + public void destroy() { - if (hostScore == null) { + contextPath.set(null); - // No hosts. Can't proxy. - return null; + journalRef.set(null); + + serviceTable.set(null); } + + @Override + public void init(final ServletConfig servletConfig, + final IIndexManager indexManager) throws ServletException { - final HAGlueScore[] services = this.serviceTable.get(); + final ServletContext servletContext = servletConfig + .getServletContext(); - if (services == null) { + contextPath.set(servletContext.getContextPath()); - // No services. Can't proxy. - return null; + final HAJournal journal = (HAJournal) BigdataServlet + .getIndexManager(servletContext); + this.journalRef.set(new WeakReference<HAJournal>(journal)); + + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal + .getQuorum(); + + quorum.addListener(this); + } - - /* - * Find a service on that host. - * - * TODO If none found, the try other hosts until we have tried each host - * once and then give up by returning null. This will require pushing - * down the service finder into a method that we call from the hosts - * loop. + + @Override + public boolean service(final boolean isUpdate, + final HttpServletRequest request, + final HttpServletResponse response) throws ServletException, + IOException { + + /* + * Figure out whether the quorum is met and if this is the quorum + * leader. + */ + final HAJournal journal = getJournal(); + Quorum<HAGlue, QuorumService<HAGlue>> quorum = null; + QuorumService<HAGlue> quorumService = null; + long token = Quorum.NO_QUORUM; // assume no quorum. + boolean isLeader = false; // assume false. + boolean isQuorumMet = false; // assume false. + if (journal != null) { + quorum = journal.getQuorum(); + if (quorum != null) { + try { + // Note: This is the *local* HAGlueService. + quorumService = (QuorumService) quorum.getClient(); + token = quorum.token(); + isLeader = quorumService.isLeader(token); + isQuorumMet = token != Quorum.NO_QUORUM; + } catch (IllegalStateException ex) { + // Note: Not available (quorum.start() not + // called). + } + } + } + + if ((isLeader && isUpdate) || !isQuorumMet) { + + /* + * (1) If this service is the leader and the request is an + * UPDATE, then we forward the request to the local service. It + * will handle the UPDATE request. + * + * (2) If the quorum is not met, then we forward the request to + * the local service. It will produce the appropriate error + * message. + * + * FIXME (3) For read-only requests, have a configurable + * preference to forward the request to this service unless + * either (a) there is a clear load imbalance. This will help to + * reduce the latency of the request. If HAProxy is being used + * to load balance over the readers, then we should have a high + * threshold before we send the request somewhere else. + * + * @see #forwardToThisService() + */ + forwardToThisService(request, response); + + // request was handled. + return true; + + } + + /* + * Hook the request to update the service/host tables if they are + * not yet defined. + */ + conditionallyUpdateServiceTable(); + + // request was not handled. + return false; + + } + + /** + * {@inheritDoc} + * <p> + * This implementation rewrites the requestURL such that the request + * will be proxied to the quorum leader. */ - for(HAGlueScore x : services) { + @Override + final public String getLeaderURL(final HttpServletRequest request) { + + final ServletContext servletContext = request.getServletContext(); + + final HAJournal journal = (HAJournal) BigdataServlet + .getIndexManager(servletContext); + + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal + .getQuorum(); + + final UUID leaderId = quorum.getLeaderId(); + + if (leaderId == null) { + // No quorum, so no leader. Can not proxy the request. + return null; + } + + /* + * Scan the services table to locate the leader and then proxy the + * request to the pre-computed requestURL for the leader. If that + * requestURL is null then we do not know about a leader and can not + * proxy the request at this time. + */ + + final HAGlueScore[] services = serviceTable.get(); - if (x.hostname == null) { - // Can't use if no hostname. - continue; + if (services == null) { + + // No services. Can't proxy. + return null; + } - if (x.requestURL == null) { - // Can't use if no requestURL. - continue; + for (HAGlueScore s : services) { + + if (s.serviceUUID.equals(leaderId)) { + + // Found it. Proxy if the serviceURL is defined. + return s.requestURL; + + } + } + + // Not found. Won't proxy. + return null; - if (!x.hostname.equals(hostScore.hostname)) { - // This service is not on the host we are looking for. - continue; + } + + /** + * {@inheritDoc} + * <p> + * The services table is updated if a services joins or leaves the + * quorum. + */ + @Override + public void notify(final QuorumEvent e) { + switch(e.getEventType()) { + case SERVICE_JOIN: + case SERVICE_LEAVE: + updateServiceTable(); + break; } + } + + /** + * Conditionally update the {@link #serviceTable} iff it does not exist + * or is empty. + */ + protected void conditionallyUpdateServiceTable() { - return x.requestURL; - + final HAGlueScore[] services = serviceTable.get(); + + if (services == null || services.length == 0) { + + /* + * Ensure that the service table exists (more correctly, attempt + * to populate it, but we can only do that if the + * HAQuorumService is running.) + * + * FIXME This should be robust even when the HAQuorumService is + * not running. We do not want to be unable to proxy to another + * service just because this one is going through an error + * state. Would it make more sense to have a 2nd Quorum object + * for this purpose - one that is not started and stopped by the + * HAJournalServer? + * + * Note: Synchronization here is used to ensure only one thread + * runs this logic if the table does not exist and we get a + * barrage of requests. + */ + synchronized (serviceTable) { + + updateServiceTable(); + + } + + } + } + + /** + * Update the per-service table. + * + * @see #serviceTable + */ + protected void updateServiceTable() { - // No service found on that host. - return null; - + final HAJournal journal = getJournal(); + + final Quorum<HAGlue, QuorumService<HAGlue>> quorum = journal + .getQuorum(); + + final UUID[] joined = quorum.getJoined(); + + final HAGlueScore[] serviceScores = new HAGlueScore[joined.length]; + + for (int i = 0; i < joined.length; i++) { + + final UUID serviceId = joined[i]; + + try { + + /* + * TODO Scan the existing table before doing an RMI to the + * service. We only need to do the RMI for a new service, + * not one in the table. + * + * TODO A services HashMap<UUID,HAGlueScore> would be much + * more efficient than a table. If we use a CHM, then we can + * do this purely asynchronously as the HAGlue services + * entire the set of joined services. + */ + serviceScores[i] = new HAGlueScore(journal, + contextPath.get(), serviceId); + + } catch (RuntimeException ex) { + + /* + * Ignore. Might not be an HAGlue instance. + */ + + if (log.isInfoEnabled()) + log.info(ex, ex); + + continue; + + } + + } + + if (log.isInfoEnabled()) + log.info("Updated servicesTable: #services=" + + serviceScores.length); + + this.serviceTable.set(serviceScores); + + } + } - int nextHost = 0; + + /** + * This policy proxies all requests for update operations to the leader but + * forwards read requests to the local service. Thus, it does not provide a + * load balancing strategy, but it does allow update requests to be directed + * to any service in an non-HA aware manner. This policy can be combined + * with an external round-robin strategy to load balance the read-requests + * over the cluster. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * + * TODO A service that is not joined with the met quorum can not + * answer a read-request. In order to be generally useful (and not + * just as a debugging policy), we need to proxy a read-request when + * this service is not joined with the met quorum. If there is no + * met quorum, then we can just forward the request to the local + * service and it will report the NoQuorum error. + */ + public static class NOPLBSPolicy extends AbstractLBSPolicy { - /** Place into descending order by load_one. */ - public static class DefaultHostReportComparator extends - HostReportComparator implements Comparator<IHostReport> { + @Override + public boolean service(final boolean isUpdate, + final HttpServletRequest request, + final HttpServletResponse response) throws IOException, + ServletException { - public DefaultHostReportComparator() { - super("load_one", true/* asc */); + if (!isUpdate) { + + // Always handle read requests locally. + forwardToThisService(request, response); + + // Request was handled. + return true; + + } + + // Proxy update requests to the quorum leader. + return super.service(isUpdate, request, response); + } + /** + * Note: This method is not invoked. + */ + @Override + public String getReaderURL(final HttpServletRequest req) { + + throw new UnsupportedOperationException(); + + } + } /** - * Stochastically proxy the request to the services based on their load. + * Policy implements a round-robin over the services that are joined with + * the met quorum. * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> */ - public static class DefaultLBSPolicy implements IHALoadBalancerPolicy { + public static class RoundRobinPolicy extends AbstractLBSPolicy { + /** + * {@inheritDoc} + * <p> + * This imposes a round-robin policy over the discovered services. If + * the service is discovered and appears to be joined with the met + * quorum, then the request can be proxied to that service. + */ @Override - public String proxyTo(HttpServletRequest req) { - // TODO Auto-generated method stub - return null; + public String getReaderURL(final HttpServletRequest request) { + + final HAGlueScore[] serviceScores = this.serviceTable.get(); + + if (serviceScores == null) { + + // Nothing discovered. Can't proxy. + return null; + + } + + /* + * Choose a service. + * + * Note: This is a round robin over the services. Any service that + * is joined with the met quorum can be selected as a target for the + * read request. + * + * Note: The round-robin is atomic with respect to each request. The + * request obtains a starting point in the serviceScores[] and then + * finds the next service in that array using a round-robin. The + * [nextService] starting point is not updated until the round-robin + * is complete - this is necessary in order to avoid skipping over + * services that are being checked by a concurrent request. + * + * The [nextService] is updated only after the round-robin decision + * has been made. As soon as it has been updated, a new round-robin + * decision will be made with respect to the new value for + * [nextService] but any in-flight decisions will be made against + * the value of [nextService] that they observed on entry. + */ + + // The starting offset for the round-robin. + final long startIndex = nextService.longValue(); + + // The selected service. + HAGlueScore serviceScore = null; + + for (int i = 0; i < serviceScores.length; i++) { + + /* + * Find the next host index. + * + * Note: We need to ensure that the hostIndex stays in the legal + * range, even with concurrent requests and when wrapping around + * MAX_VALUE. + */ + final int hostIndex = (int) Math + .abs(((i + startIndex) % serviceScores.length)); + + serviceScore = serviceScores[hostIndex]; + + if (serviceScore == null) + continue; + + if (serviceScore.hostname == null) { + // Can't use if no hostname. + continue; + } + + if (serviceScore.requestURL == null) { + // Can't use if no requestURL. + continue; + } + + } + + // Bump the nextService counter. + nextService.incrementAndGet(); + + if (serviceScore == null) { + + // No service. Can't proxy. + return null; + + } + + return serviceScore.requestURL; + } - + + /** + * Note: This could be a hot spot. We can have concurrent requests and + * we need to increment this counter for each such request. + */ + private final AtomicLong nextService = new AtomicLong(0L); + } /** - * Always proxy the request to the local service even if it is not HA ready - * (this policy defeats the load balancer). + * Stochastically proxy the request to the services based on their load. + * <p> + * Note: This {@link IHALoadBalancerPolicy} has a dependency on the + * {@link GangliaPlugIn}. The {@link GangliaPlugIn} must be setup to listen + * to the Ganglia protocol and build up an in-memory model of the load on + * each host. Ganglia must be reporting metrics for each host running an + * {@link HAJournalServer} instance. This can be achieved either using the + * <code>gmond</code> utility from the ganglia distribution or using the + * {@link GangliaPlugIn}. * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> */ - public static class NOPLBSPolicy implements IHALoadBalancerPolicy { + public static class GangliaLBSPolicy extends AbstractLBSPolicy { + public interface InitParams extends AbstractLBSPolicy.InitParams { + +// /** +// * A {@link Comparator} that places {@link IHostReport}s into a +// * total ordering from the host with the least load to the host with +// * the greatest load (optional). +// */ +// String COMPARATOR = "comparator"; +// +// String DEFAULT_COMPARATOR = DefaultHostReportComparator.class +// .getName(); + + /** + * The {@link IHostScoringRule} that will be used to score the + * {@link IHostReport}s. The {@link IHostReport}s are obtained + * periodically from the {@link GangliaPlugIn}. The reports reflect + * the best local knowledge of the metrics on each of the hosts. The + * hosts will each self-report their metrics periodically using the + * ganglia protocol. + * <p> + * The purpose of the scoring rule is to compute a single workload + * number based on those host metrics. The resulting scores are then + * normalized. Load balancing decisions are made based on those + * normalized scores. + */ + String HOST_SCORING_RULE = "hostScoringRule"; + + String DEFAULT_HOST_SCORING_RULE = DefaultHostScoringRule.class + .getName(); + + } + + /** + * Interface for scoring the load on a host. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + public interface IHostScoringRule { + + /** + * Return a score for the given {@link IHostReport}. + * + * @param hostReport + * The {@link IHostReport}. + * + * @return The score. + */ + public double getScore(final IHostReport hostReport); + + } + + /** + * Returns ONE for each host (all hosts appear to have an equal + * workload). + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + */ + public static class NOPHostScoringRule implements IHostScoringRule { + + @Override + public double getScore(final IHostReport hostReport) { + + return 1d; + + } + + } + + /** + * Best effort computation of a workload score based on CPU Utilization, + * IO Wait, and GC time. + * <p> + * Note: Not all platforms report all metrics. For example, OSX does not + * report IO Wait, which is a key metric for the workload of a database. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * + * FIXME GC time is a JVM metric. It will only get reported by + * the {@link GangliaPlugIn} if it is setup to self-report that + * data. And it may not report it correctly if there is more + * than one {@link HAJournalService} per host. It is also + * available from /counters and could be exposed as a JMX MBean. + */ + public static class DefaultHostScoringRule implements IHostScoringRule { + + @Override + public double getScore(final IHostReport hostReport) { + + final Map<String, IGangliaMetricMessage> metrics = hostReport + .getMetrics(); + + /* + * TODO Use "load_one" if we can't get both "cpu_system" and + * "cpu_user". + */ +// final double cpu_system; +// { +// +// final IGangliaMetricMessage m = metrics.get("cpu_system"); +// +// if (m != null) +// cpu_system = m.getNumericValue().doubleValue(); +// else +// cpu_system = .25d; +// +// } +// +// final double cpu_user; +// { +// +// final IGangliaMetricMessage m = metrics.get("cpu_user"); +// +// if (m != null) +// cpu_user = m.getNumericValue().doubleValue(); +// else +// cpu_user = .25d; +// +// } + + final double cpu_idle; + { + + final IGangliaMetricMessage m = metrics.get("cpu_idle"); + + if (m != null) + cpu_idle = m.getNumericValue().doubleValue(); + else + cpu_idle = .5d; + + } + + final double cpu_wio; + { + + final IGangliaMetricMessage m = metrics.get("cpu_wio"); + + if (m != null) + cpu_wio = m.getNumericValue().doubleValue(); + else + cpu_wio = .05d; + + } + + final double hostScore = (1d + cpu_wio * 100d) + / (1d + cpu_idle); + + return hostScore; + + } + + } + + /** + * Place into descending order by load_one. + * <p> + * Note: We do not rely on the ordering imposed by this comparator. + * Instead, we filter the hosts for those that correspond to the joined + * services in the met quorum, compute a score for each such host, and + * then normalize those scores. + */ + private final Comparator<IHostReport> comparator = new HostReportComparator( + "load_one", false/* asc */); + + /** + * The ganglia service - it must be configured at least as a listener. + */ + private GangliaService gangliaService; + + /** + * The set of metrics that we are requesting in the ganglia host + * reports. + */ + private String[] reportOn; + + /** + * The {@link Future} of a task that periodically queries the ganglia + * peer for its up to date host counters for each discovered host. + */ + private ScheduledFuture<?> scheduledFuture; + + /** + * The table of pre-scored hosts. + * <P> + * Note: There is an entry for all known hosts, but not all hosts are + * running services that we care about. This means that we have to run + * over the table, filtering for hosts that have services that we care + * about. + */ + private final AtomicReference<HostScore[]> hostTable = new AtomicReference<HostScore[]>( + null); + + /** + * The most recent score for this host. + */ + private final AtomicReference<HostScore> thisHostScore = new AtomicReference<HostScore>(); + + /** + * The rule used to score the host reports. + */ + private IHostScoringRule scoringRule; + +// @SuppressWarnings("unchecked") @Override - public String proxyTo(HttpServletRequest req) { - // TODO Auto-generated method stub - return null; + public void init(final ServletConfig servletConfig, + final IIndexManager indexManager) throws ServletException { + + super.init(servletConfig, indexManager); + +// comparator = newInstance(servletConfig, Comparator.class, +// ... [truncated message content] |