This list is closed, nobody may subscribe to it.
2010 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
(139) |
Aug
(94) |
Sep
(232) |
Oct
(143) |
Nov
(138) |
Dec
(55) |
---|---|---|---|---|---|---|---|---|---|---|---|---|
2011 |
Jan
(127) |
Feb
(90) |
Mar
(101) |
Apr
(74) |
May
(148) |
Jun
(241) |
Jul
(169) |
Aug
(121) |
Sep
(157) |
Oct
(199) |
Nov
(281) |
Dec
(75) |
2012 |
Jan
(107) |
Feb
(122) |
Mar
(184) |
Apr
(73) |
May
(14) |
Jun
(49) |
Jul
(26) |
Aug
(103) |
Sep
(133) |
Oct
(61) |
Nov
(51) |
Dec
(55) |
2013 |
Jan
(59) |
Feb
(72) |
Mar
(99) |
Apr
(62) |
May
(92) |
Jun
(19) |
Jul
(31) |
Aug
(138) |
Sep
(47) |
Oct
(83) |
Nov
(95) |
Dec
(111) |
2014 |
Jan
(125) |
Feb
(60) |
Mar
(119) |
Apr
(136) |
May
(270) |
Jun
(83) |
Jul
(88) |
Aug
(30) |
Sep
(47) |
Oct
(27) |
Nov
(23) |
Dec
|
2015 |
Jan
|
Feb
|
Mar
|
Apr
|
May
|
Jun
|
Jul
|
Aug
|
Sep
(3) |
Oct
|
Nov
|
Dec
|
2016 |
Jan
|
Feb
|
Mar
(4) |
Apr
(1) |
May
|
Jun
|
Jul
|
Aug
|
Sep
|
Oct
|
Nov
|
Dec
|
From: <tho...@us...> - 2013-10-24 14:58:25
|
Revision: 7476 http://bigdata.svn.sourceforge.net/bigdata/?rev=7476&view=rev Author: thompsonbry Date: 2013-10-24 14:58:13 +0000 (Thu, 24 Oct 2013) Log Message: ----------- Test suite fix. restart leader and restart follower do not imply a guaranteed order in which the services will join. After a quorum break, everyone leaves and then comes back in. The first service to notice (or restart) will wind up as the new leader. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-24 14:36:40 UTC (rev 7475) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-24 14:58:13 UTC (rev 7476) @@ -531,11 +531,18 @@ } protected void assertReady(final HAGlue[] members) throws IOException { - for (HAGlue member : members) { - final HAStatusEnum status = member.getHAStatus(); - if(log.isInfoEnabled())log.info(member.getServiceName() + ": " + status); - assertFalse(HAStatusEnum.NotReady == status); - } + + for (HAGlue member : members) { + + final HAStatusEnum status = member.getHAStatus(); + + if (log.isInfoEnabled()) + log.info(member.getServiceName() + ": " + status); + + assertFalse(HAStatusEnum.NotReady == status); + + } + } /** Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java 2013-10-24 14:36:40 UTC (rev 7475) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java 2013-10-24 14:58:13 UTC (rev 7476) @@ -226,17 +226,23 @@ */ assertEquals(token1 + 1, token2); - // The leader should not have changed. - final HAGlue leader2 = quorum.getClient().getLeader(token2); - - final UUID leaderId2 = leader2.getServiceId(); - - if (!leaderId1.equals(leaderId2)) { - - fail("Expected leaderId=" + leaderId1 + ", but was " - + leaderId2); - - } + /* + * Note: The services will recast their votes. The pipeline order + * depends on whether the leader restarts more quickly than the + * latency required for the follower to notice that the leader is + * dead (this depends on the negotiated zookeeper session timeout). + */ +// // The leader should not have changed. +// final HAGlue leader2 = quorum.getClient().getLeader(token2); +// +// final UUID leaderId2 = leader2.getServiceId(); +// +// if (!leaderId1.equals(leaderId2)) { +// +// fail("Expected leaderId=" + leaderId1 + ", but was " +// + leaderId2); +// +// } /* * Verify that the votes were recast for the then current @@ -252,6 +258,8 @@ */ for (HAGlue service : new HAGlue[] { serverA, serverB }) { + awaitNSSAndHAReady(service); + final RemoteRepository repo = getRemoteRepository(service); // Should be empty. @@ -323,22 +331,28 @@ */ assertEquals(token1 + 1, token2); - // The leader should have changed. - final HAGlue leader2 = quorum.getClient().getLeader(token2); + /* + * Note: The services will recast their votes. The pipeline order + * depends on whether the leader restarts more quickly than the + * latency required for the follower to notice that the leader is + * dead (this depends on the negotiated zookeeper session timeout). + */ +// // The leader should have changed. +// final HAGlue leader2 = quorum.getClient().getLeader(token2); +// +// final UUID leaderId2 = leader2.getServiceId(); +// +// if (leaderId1.equals(leaderId2)) { +// /* +// * FIXME This fail message is not useful. +// * leaderId1.equals(leaderId2). it should report what the leader +// * *should* have been, but reports two identical values instead. +// */ +// fail("Expected leaderId=" + leaderId1 + ", but was " +// + leaderId2); +// +// } - final UUID leaderId2 = leader2.getServiceId(); - - if (leaderId1.equals(leaderId2)) { - /* - * FIXME This fail message is not useful. - * leaderId1.equals(leaderId2). it should report what the leader - * *should* have been, but reports two identical values instead. - */ - fail("Expected leaderId=" + leaderId1 + ", but was " - + leaderId2); - - } - /* * Verify that the votes were recast for the then current * lastCommitTime. @@ -352,6 +366,8 @@ * both the leader and the follower. */ for (HAGlue service : new HAGlue[] { serverA, serverB }) { + + awaitNSSAndHAReady(service); final RemoteRepository repo = getRemoteRepository(service); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-24 14:36:57
|
Revision: 7475 http://bigdata.svn.sourceforge.net/bigdata/?rev=7475&view=rev Author: thompsonbry Date: 2013-10-24 14:36:40 +0000 (Thu, 24 Oct 2013) Log Message: ----------- Rolling back to zk 3.3.3. zk 3.4.5 has a reverse DNS lookup that causes a 5 second lag in the test case runs and throws off the test suite. Added a private LookupDiscoveryManager to AbstractServer. This is used to export the service proxy rather than relying on the LookupDiscoveryManager in the HAClient. This allows us to leave the service proxy exported until the tear down. AbstractJournal.setQuorumToken2() modified to be robust when the HAQuorumService is terminated. BigdataServlet modified to log if HTTP request was denied for HA NotReady or not Leader. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/.classpath branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZLockImpl.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java Added Paths: ----------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/lib/apache/zookeeper-3.3.3.jar Modified: branches/ZK_DISCONNECT_HANDLING/.classpath =================================================================== --- branches/ZK_DISCONNECT_HANDLING/.classpath 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/.classpath 2013-10-24 14:36:40 UTC (rev 7475) @@ -33,7 +33,7 @@ <classpathentry kind="src" path="bigdata-gas/src/test"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/dsi-utils-1.0.6-020610.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/lgpl-utils-1.0.6-020610.jar"/> - <classpathentry kind="lib" path="bigdata-jini/lib/apache/zookeeper-3.4.5.jar"/> + <classpathentry kind="lib" path="bigdata-jini/lib/apache/zookeeper-3.3.3.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-continuation-7.2.2.v20101205.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-http-7.2.2.v20101205.jar"/> <classpathentry exported="true" kind="lib" path="bigdata/lib/jetty/jetty-io-7.2.2.v20101205.jar"/> Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/journal/AbstractJournal.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -5422,8 +5422,17 @@ if (quorum == null) return; - // This quorum member. - final QuorumService<HAGlue> localService = quorum.getClient(); + // The HAQuorumService (if running). + final QuorumService<HAGlue> localService; + { + QuorumService<HAGlue> t; + try { + t = quorum.getClient(); + } catch (IllegalStateException ex) { + t = null; + } + localService = t; + } // Figure out the state transitions involved. final QuorumTokenTransitions transitionState = new QuorumTokenTransitions( Added: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/lib/apache/zookeeper-3.3.3.jar =================================================================== (Binary files differ) Property changes on: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/lib/apache/zookeeper-3.3.3.jar ___________________________________________________________________ Added: svn:mime-type + application/octet-stream Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -53,11 +53,13 @@ import net.jini.config.Configuration; import net.jini.config.ConfigurationException; import net.jini.config.ConfigurationProvider; +import net.jini.core.discovery.LookupLocator; import net.jini.core.entry.Entry; import net.jini.core.lookup.ServiceID; import net.jini.core.lookup.ServiceRegistrar; import net.jini.discovery.DiscoveryEvent; import net.jini.discovery.DiscoveryListener; +import net.jini.discovery.LookupDiscoveryManager; import net.jini.export.Exporter; import net.jini.jeri.BasicILFactory; import net.jini.jeri.BasicJeriExporter; @@ -259,8 +261,10 @@ * The {@link Configuration} read based on the args[] provided when the * server is started. */ - protected Configuration config; + protected final Configuration config; + private final JiniClientConfig jiniClientConfig; + /** * The as-configured entry attributes. */ @@ -619,7 +623,6 @@ List<Entry> entries = null; final String COMPONENT = getClass().getName(); - final JiniClientConfig jiniClientConfig; try { // Create client. @@ -1123,11 +1126,60 @@ } /** + * Attempt to start lookup discovery. + * <p> + * Note: The returned service will perform multicast discovery if ALL_GROUPS + * is specified and otherwise requires you to specify one or more unicast + * locators (URIs of hosts running discovery services). + * + * @return The {@link LookupDiscoveryManager}. The caller MUST invoke + * {@link LookupDiscoveryManager#terminate()} when they are done + * with this service. + * + * @see JiniClientConfig + * @see JiniClientConfig#Options + * + * @throws ConfigurationException + * @throws IOException + */ + synchronized LookupDiscoveryManager startLookupDiscoveryManager( + final Configuration config) throws ConfigurationException, + IOException { + + if (lookupDiscoveryManager == null) { + + log.info("Starting lookup discovery."); + + final String[] groups = jiniClientConfig.groups; + + final LookupLocator[] lookupLocators = jiniClientConfig.locators; + + this.lookupDiscoveryManager = new LookupDiscoveryManager(groups, + lookupLocators, null /* DiscoveryListener */, config); + + } + + return lookupDiscoveryManager; + + } + private LookupDiscoveryManager lookupDiscoveryManager; + + /** * Export a proxy object for this service instance. + * + * @throws IOException + * @throws ConfigurationException */ - synchronized protected void exportProxy(final Remote impl) { + synchronized private void exportProxy(final Remote impl) + throws ConfigurationException, IOException { /* + * Note: We run our own LookupDiscoveryManager in order to avoid forcing + * an unexport of the service proxy when the HAClient is terminated. + */ + final LookupDiscoveryManager lookupDiscoveryManager = startLookupDiscoveryManager(config); + + /* * Export a proxy object for this service instance. * * Note: This must be done before we start the join manager since the @@ -1157,9 +1209,6 @@ // The as-configured Entry[] attributes. final Entry[] attributes = entries.toArray(new Entry[0]); - // Note: Throws IllegalStateException if not connected. - final HAClient.HAConnection ctx = getHAClient().getConnection(); - if (this.serviceID != null) { /* @@ -1170,7 +1219,7 @@ joinManager = new JoinManager(proxy, // service proxy attributes, // attr sets serviceID, // ServiceID - ctx.getDiscoveryManagement(), // DiscoveryManager + lookupDiscoveryManager, // DiscoveryManager new LeaseRenewalManager(), // config); @@ -1183,7 +1232,7 @@ joinManager = new JoinManager(proxy, // service proxy attributes, // attr sets this, // ServiceIDListener - ctx.getDiscoveryManagement(), // DiscoveryManager + lookupDiscoveryManager, // DiscoveryManager new LeaseRenewalManager(), // config); @@ -1220,8 +1269,16 @@ } /** - * Unexports the {@link #proxy} - this is a NOP if the proxy is - * <code>null</code>. + * Unexports the {@link #proxy}, halt the {@link JoinManager}, and halt the + * {@link LookupDiscoveryManager}. This is safe to invoke whether or not the + * proxy is exported. + * <p> + * Note: You can not re-export the proxy. The {@link Exporter} does not + * support this for a given instance. Also, once un-exported, if you create + * a new {@link Exporter} and then re-export the proxy, the new proxy will + * be a distinct instance. Most clients expect the life cycle of the proxy + * to be the life cycle of the service that is being exposed by the proxy. + * Unexporting and re-exporting will confuse such clients. * * @param force * When true, the object is unexported even if there are pending @@ -1231,16 +1288,15 @@ * * @see Exporter#unexport(boolean) */ - synchronized protected boolean unexport(final boolean force) { + synchronized private boolean unexport(final boolean force) { - try { + boolean unexported = false; - boolean unexported = false; - - if (proxy != null) { + if (proxy != null) { - log.warn("UNEXPORT PROXY: force=" + force + ", proxy=" + proxy); + log.warn("UNEXPORT PROXY: force=" + force + ", proxy=" + proxy); + try { if (exporter.unexport(force)) { unexported = true; @@ -1250,35 +1306,53 @@ log.warn("Proxy was not unexported? : " + this); } + } finally { + proxy = null; + } - if (joinManager != null) { - - try { + } - joinManager.terminate(); + if (joinManager != null) { - } catch (Throwable ex) { + try { - log.error("Could not terminate the join manager: " + this, ex); + joinManager.terminate(); - } finally { - - joinManager = null; + } catch (Throwable ex) { - } + log.error("Could not terminate the join manager: " + this, ex); + } finally { + + joinManager = null; + } - return unexported; + } - } finally { + if (lookupDiscoveryManager != null) { - proxy = null; + try { + lookupDiscoveryManager.terminate(); + + } catch (Throwable ex) { + + log.error("Could not terminate the lookup discovery manager: " + + this, ex); + + } finally { + + lookupDiscoveryManager = null; + + } + } + return unexported; + } /** @@ -1527,7 +1601,10 @@ * the service is also destroyed. */ final protected void shutdownNow(final boolean destroy) { - + +// if (log.isTraceEnabled()) +// HAJournalTest.dumpThreads(); + // Atomically change RunState. if (!runState.compareAndSet(RunState.Start, RunState.ShuttingDown) && !runState.compareAndSet(RunState.Running, @@ -1545,28 +1622,25 @@ try { -// /* -// * Unexport the proxy, making the service no longer available. -// * -// * Note: If you do not do this then the client can still make -// * requests even after you have terminated the join manager and the -// * service is no longer visible in the service browser. -// */ -// try { -// -// if (log.isInfoEnabled()) -// log.info("Unexporting the service proxy."); -// -// unexport(true/* force */); -// -// } catch (Throwable ex) { -// -// log.error("Problem unexporting service: " + this, ex); -// -// /* Ignore */ -// -// } + /* + * Unexport the proxy, making the service no longer available. + * + * Note: If you do not do this then the client can still make + * requests even after you have terminated the join manager and the + * service is no longer visible in the service browser. + */ + try { + unexport(true/* force */); + + } catch (Throwable ex) { + + log.error("Problem unexporting service: " + this, ex); + + /* Ignore */ + + } + if (destroy && impl != null && impl instanceof IService) { final IService tmp = (IService) impl; @@ -1864,39 +1938,40 @@ // ignore. log.warn("Could not set thread name: " + ex); - + } - boolean started = false; try { - /* - * Create the service object. - */ - if (log.isInfoEnabled()) - log.info("Creating service impl..."); - - impl = newService(config); - - if (log.isInfoEnabled()) - log.info("Service impl is " + impl); - + + // Create the service object. + impl = newService(config); + + // Export the service. + exportProxy(impl); + + // Start the service. startUpHook(); - started = true; - } catch (Exception e) { - log.error(e, e); - } finally { - if (!started) - shutdownNow(false/* destroy */); + + } catch (Throwable t) { + + fatal("Startup failure", t); + + throw new AssertionError(); + } if (runState.compareAndSet(RunState.Start, RunState.Running)) { { + final String msg = "Service is running: class=" + getClass().getName() + ", name=" + getServiceName(); + System.out.println(msg); + if (log.isInfoEnabled()) log.info(msg); + } /* @@ -1922,7 +1997,7 @@ } } - + System.out.println("Service is down: class=" + getClass().getName() + ", name=" + getServiceName()); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -59,7 +59,10 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; @@ -81,7 +84,7 @@ import com.bigdata.service.IServiceShutdown; import com.bigdata.service.jini.JiniClient; import com.bigdata.service.jini.JiniClientConfig; -import com.bigdata.zookeeper.ZooKeeperAccessor; +import com.bigdata.util.StackInfoReport; import com.sun.jini.start.ServiceDescriptor; /** @@ -651,7 +654,6 @@ */ private final AtomicReference<HAClient> clientRef = new AtomicReference<HAClient>(); -// private ZooKeeperAccessor zka; private ZooKeeper zk; private LookupDiscoveryManager lookupDiscoveryManager; @@ -777,7 +779,7 @@ throw new IllegalStateException(); if (log.isInfoEnabled()) - log.info(jiniConfig.toString()); + log.info(jiniConfig.toString(), new StackInfoReport()); final String[] groups = jiniConfig.groups; @@ -786,14 +788,6 @@ try { /* - * Connect to a zookeeper service in the declare ensemble of - * zookeeper servers. - */ - - zk = new ZooKeeperAccessor(zooConfig.servers, - zooConfig.sessionTimeout).getZookeeper(); - - /* * Note: This class will perform multicast discovery if * ALL_GROUPS is specified and otherwise requires you to specify * one or more unicast locators (URIs of hosts running discovery @@ -839,9 +833,56 @@ serviceDiscoveryManager, this/* serviceDiscoveryListener */, cacheMissTimeout); + /* + * Connect to a zookeeper service in the declare ensemble of + * zookeeper servers. + */ + log.info("Creating ZooKeeper connection."); + + zk = new ZooKeeper(zooConfig.servers, zooConfig.sessionTimeout, + new Watcher() { + @Override + public void process(final WatchedEvent event) { + if (log.isInfoEnabled()) + log.info(event); + } + }); + + /** + * Wait until zookeeper is connected. Figure out how long that + * took. If reverse DNS is not setup, then the following two + * tickets will prevent the service from starting up in a timely + * manner. We detect this with a delay of 4+ seconds before + * zookeeper becomes connected. This issue does not appear in + * zookeeper 3.3.4. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1652 + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1666 + */ + { + final long begin = System.nanoTime(); + while (zk.getState().isAlive()) { + if (zk.getState() == States.CONNECTED) { + // connected. + break; + } + // wait and then retry. + Thread.sleep(100/* ms */); + } + final long elapsed = System.nanoTime() - begin; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 4) { + log.error("Reverse DNS is not configured. The ZooKeeper client is taking too long to resolve server(s): " + + zooConfig.servers + + ", took=" + + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms"); + } + } + // And set the reference. The client is now "connected". this.clientRef.set(client); + log.info("Done."); + } catch (Exception ex) { log.fatal( Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -623,9 +623,8 @@ */ private void setupZNodes() throws KeeperException, InterruptedException { - if (log.isInfoEnabled()) { + if (log.isInfoEnabled()) log.info("Ensuring key znodes exist."); - } final ZookeeperClientConfig zkClientConfig = getHAClient() .getZookeeperClientConfig(); @@ -661,6 +660,9 @@ @Override protected HAGlue newService(final Configuration config) throws Exception { + if (log.isInfoEnabled()) + log.info("Creating service impl..."); + // /* // * Verify discovery of at least one ServiceRegistrar. // */ @@ -1365,6 +1367,7 @@ * Verify discovery of at least one ServiceRegistrar. */ try { + log.info("Awaiting service registrar discovery."); server.getHAClient() .getConnection() .awaitServiceRegistrars(10/* timeout */, @@ -1377,8 +1380,6 @@ // Ensure key znodes exist. try { - // - // server.setupZNodes(); } catch (KeeperException e) { throw new RuntimeException(e); @@ -1402,9 +1403,6 @@ log.trace(e); } }); - - // Export the service proxy. - server.exportProxy(server.haGlueService); // Enter a run state for the HAJournalServer. runStateRef.set(null); // clear reference. @@ -1445,26 +1443,7 @@ } } - - /* - * Unexport the proxy, making the service no longer available. - * - * Note: If you do not do this then the client can still make - * requests even after you have terminated the join manager and the - * service is no longer visible in the service browser. - */ - try { - - server.unexport(true/* force */); - - } catch (Throwable ex) { - - log.error("Problem unexporting service: " + this, ex); - - /* Ignore */ - - } - + // Disconnect. Terminate River and Zookeeper processing. server.getHAClient().disconnect(true/* immediateShutdown */); @@ -2139,13 +2118,16 @@ // Reduce to negotiated timeout GT ZERO. sessionTimeout = sessionTimeout2; } - if (zk.getState().isConnected()) { + switch(zk.getState()) { + case CONNECTED: break; } final long elapsed = System.nanoTime() - begin; if (elapsed > TimeUnit.MILLISECONDS .toNanos(sessionTimeout)) { - log.error("Tearing down service: ZK Session remains disconnected"); + log.error("Tearing down service: ZK Session remains disconnected for " + + TimeUnit.NANOSECONDS.toMillis(elapsed) + + "ms, effectiveTimeout=" + sessionTimeout); restartHAQuorumService(); break; } @@ -2253,6 +2235,11 @@ */ log.warn("HAQuorumService: TERMINATE"); journal.getQuorum().terminate(); + /* + * Force the clear of the token since disconnected + * from zookeeper. + */ + journal.clearQuorumToken(Quorum.NO_QUORUM); } catch (Throwable t) { log.error(t, t); // Re-enter the ErrorTask. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -67,7 +67,6 @@ import com.bigdata.quorum.QuorumWatcher; import com.bigdata.util.InnerCause; import com.bigdata.util.concurrent.DaemonThreadFactory; -import com.bigdata.zookeeper.ZooKeeperAccessor; /** * Implementation of the {@link Quorum} using zookeeper to maintain the @@ -1440,28 +1439,28 @@ private void handleExpired() { log.error("ZOOKEEPER SESSION EXPIRED: token=" + token()); doNotifyClientDisconnected(); - /* - * FIXME We can not cure an expire ZK session. Instead, we tear down - * the QuorumClient, obtain a new HClient connectionm, and then - * restart the QuorumClient. Therefore this code should go since it - * not make progress. - */ - while (true) { - try { - // wait for a valid ZooKeeper connection. - final ZooKeeper zk = getZookeeper(); - // set the watchers. - setupWatchers(zk); - // done. - return; - } catch (KeeperException e2) { - log.error(e2, e2); - } catch (InterruptedException e2) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } - } +// /* +// * FIXME We can not cure an expire ZK session. Instead, we tear down +// * the QuorumClient, obtain a new HClient connectionm, and then +// * restart the QuorumClient. Therefore this code should go since it +// * not make progress. +// */ +// while (true) { +// try { +// // wait for a valid ZooKeeper connection. +// final ZooKeeper zk = getZookeeper(); +// // set the watchers. +// setupWatchers(zk); +// // done. +// return; +// } catch (KeeperException e2) { +// log.error(e2, e2); +// } catch (InterruptedException e2) { +// // propagate the interrupt. +// Thread.currentThread().interrupt(); +// return; +// } +// } } /** @@ -2342,8 +2341,8 @@ * @param replicationFactor * The replication factor for the quorum (must be a non-negative, * odd integer). - * @param zka - * The {@link ZooKeeperAccessor}. + * @param zk + * The {@link ZooKeeper} client connection * @param acl * The ACLs. * Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -266,7 +266,7 @@ try { log.warn("Creating new client"); - // FIXME must not create new zk while session not expired. + zookeeper = new ZooKeeper(hosts, sessionTimeout, new ZooAliveWatcher()); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -64,7 +64,10 @@ import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.KeeperException.SessionExpiredException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.ACL; import com.bigdata.ha.HAGlue; @@ -98,7 +101,6 @@ import com.bigdata.util.config.NicUtil; import com.bigdata.zookeeper.DumpZookeeper; import com.bigdata.zookeeper.ZooHelper; -import com.bigdata.zookeeper.ZooKeeperAccessor; /** * Class layers in support to start and stop the {@link HAJournalServer} @@ -328,7 +330,7 @@ // Unique for each test. logicalServiceId = "CI-HAJournal-" + getName() + "-" + UUID.randomUUID(); - + /* * Read the jini/river configuration file. We need this to setup the * clients that we will use to lookup the services that we start. @@ -1270,10 +1272,11 @@ * @throws ConfigurationException * @throws InterruptedException * @throws KeeperException + * @throws IOException */ protected Quorum<HAGlue, QuorumClient<HAGlue>> newQuorum() throws ConfigurationException, InterruptedException, - KeeperException { + KeeperException, IOException { final Configuration config = ConfigurationProvider .getInstance(new String[] { SRC_PATH + "zkClient.config" }); @@ -1285,9 +1288,41 @@ final int sessionTimeout = zkClientConfig.sessionTimeout; // Note: Save reference. - this.zookeeper = new ZooKeeperAccessor(zoohosts, sessionTimeout) - .getZookeeper(); -// this.acl = acl; + this.zookeeper = new ZooKeeper(zoohosts, sessionTimeout, new Watcher() { + @Override + public void process(WatchedEvent event) { + if (log.isInfoEnabled()) + log.info(event); + } + }); + /** + * Wait until zookeeper is connected. Figure out how long that took. If + * reverse DNS is not setup, then the following two tickets will prevent + * the service from starting up in a timely manner. We detect this with + * a delay of 4+ seconds before zookeeper becomes connected. This issue + * does not appear in zookeeper 3.3.4. + * + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1652 + * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1666 + */ + { + final long begin = System.nanoTime(); + while (zookeeper.getState().isAlive()) { + if (zookeeper.getState() == States.CONNECTED) { + // connected. + break; + } + // wait and then retry. + Thread.sleep(100/* ms */); + } + final long elapsed = System.nanoTime() - begin; + if (TimeUnit.NANOSECONDS.toSeconds(elapsed) > 4) { + fail("Reverse DNS is not configured. The ZooKeeper client is taking too long to resolve server(s): " + + zoohosts + + ", took=" + + TimeUnit.NANOSECONDS.toMillis(elapsed) + "ms"); + } + } // znode name for the logical service. // final String logicalServiceId = (String) config.getEntry( @@ -1702,6 +1737,8 @@ * Wait until the server is running. */ assertCondition(new Runnable() { + + @Override public void run() { try { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHAJournalServerTestCase.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -314,7 +314,19 @@ protected void awaitHAStatus(final HAGlue haGlue, final HAStatusEnum expected) throws IOException { + awaitHAStatus(5, TimeUnit.SECONDS, haGlue, expected); + + } + + /** + * Check the HAStatusEnum for the service. + */ + protected void awaitHAStatus(final long timeout, final TimeUnit unit, + final HAGlue haGlue, final HAStatusEnum expected) + throws IOException { + assertCondition(new Runnable() { + @Override public void run() { try { assertEquals(expected, haGlue.getHAStatus()); @@ -322,7 +334,7 @@ throw new RuntimeException(e); } } - }, 5, TimeUnit.SECONDS); + }, timeout, unit); } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA2JournalServer.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -329,7 +329,11 @@ final UUID leaderId2 = leader2.getServiceId(); if (leaderId1.equals(leaderId2)) { - + /* + * FIXME This fail message is not useful. + * leaderId1.equals(leaderId2). it should report what the leader + * *should* have been, but reports two identical values instead. + */ fail("Expected leaderId=" + leaderId1 + ", but was " + leaderId2); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -31,8 +31,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import junit.framework.AssertionFailedError; - import net.jini.config.Configuration; import com.bigdata.ha.HACommitGlue; @@ -42,7 +40,6 @@ import com.bigdata.ha.msg.IHA2PhasePrepareMessage; import com.bigdata.ha.msg.IHANotifyReleaseTimeRequest; import com.bigdata.journal.AbstractTask; -import com.bigdata.journal.jini.ha.HAJournalServer.RunStateEnum; import com.bigdata.journal.jini.ha.HAJournalTest.HAGlueTest; import com.bigdata.journal.jini.ha.HAJournalTest.SpuriousTestException; import com.bigdata.quorum.AsynchronousQuorumCloseException; @@ -693,6 +690,8 @@ */ for (HAGlue service : new HAGlue[] { serverA, serverB }) { + awaitNSSAndHAReady(service); + final RemoteRepository repo = getRemoteRepository(service); // Should be empty. @@ -774,7 +773,7 @@ // Verify leader self-reports in new role. awaitHAStatus(leader2, HAStatusEnum.Leader); - + /* * Verify we can read on the KB on both nodes. * @@ -783,6 +782,8 @@ */ for (HAGlue service : new HAGlue[] { serverA, serverB }) { + awaitNSSAndHAReady(service); + final RemoteRepository repo = getRemoteRepository(service); // Should be empty. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -39,7 +39,6 @@ import com.bigdata.quorum.MockQuorumFixture; import com.bigdata.quorum.QuorumActor; import com.bigdata.zookeeper.AbstractZooTestCase; -import com.bigdata.zookeeper.ZooKeeperAccessor; /** * Abstract base class for testing using a {@link MockQuorumFixture}. @@ -69,7 +68,7 @@ // The per-client quorum objects. ZKQuorumImpl[] quorums; MockQuorumMember[] clients; - ZooKeeperAccessor[] accessors; + ZooKeeper[] accessors; QuorumActor[] actors; final MockServiceRegistrar<Remote> registrar = new MockServiceRegistrar(); @@ -92,7 +91,7 @@ clients = new MockQuorumMember[k]; - accessors = new ZooKeeperAccessor[k]; + accessors = new ZooKeeper[k]; actors = new QuorumActor[k]; @@ -101,7 +100,7 @@ */ for (int i = 0; i < k; i++) { accessors[i] = getZooKeeperAccessorWithDistinctSession(); - final ZooKeeper zk = accessors[i].getZookeeper(); + final ZooKeeper zk = accessors[i]; quorums[i] = new ZKQuorumImpl(k);//, accessors[i], acl); clients[i] = new MockQuorumMember(logicalServiceId, registrar) { public Remote newService() { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -39,12 +39,10 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.ACL; import com.bigdata.quorum.AsynchronousQuorumCloseException; import com.bigdata.quorum.QuorumActor; import com.bigdata.zookeeper.AbstractZooTestCase; -import com.bigdata.zookeeper.ZooKeeperAccessor; /** * Test suite for {@link ZKQuorumImpl}. @@ -69,7 +67,7 @@ /** * Unit test for - * {@link ZKQuorumImpl#setupQuorum(String, ZooKeeperAccessor, List)} + * {@link ZKQuorumImpl#setupQuorum(String, int, ZooKeeper, List)} * * @throws KeeperException * @throws InterruptedException @@ -138,7 +136,7 @@ // The per-client quorum objects. final ZKQuorumImpl[] quorums = new ZKQuorumImpl[k]; final MockQuorumMember[] clients = new MockQuorumMember[k]; - final ZooKeeperAccessor[] accessors = new ZooKeeperAccessor[k]; + final ZooKeeper[] accessors = new ZooKeeper[k]; final QuorumActor[] actors = new QuorumActor[k]; final MockServiceRegistrar<Remote> registrar = new MockServiceRegistrar(); try { @@ -148,7 +146,7 @@ */ for (int i = 0; i < k; i++) { accessors[i] = getZooKeeperAccessorWithDistinctSession(); - final ZooKeeper zk = accessors[i].getZookeeper(); + final ZooKeeper zk = accessors[i]; quorums[i] = new ZKQuorumImpl(k);//, accessors[i], acl); clients[i] = new MockQuorumMember(logicalServiceId, registrar){ public Remote newService() { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/AbstractZooTestCase.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -298,16 +298,22 @@ } /** - * Return a new {@link ZooKeeperAccessor} instance connects to the same + * Return a new {@link ZooKeeper} instance connects to the same * zookeeper ensemble but which uses distinct {@link ZooKeeper} session. * * @throws InterruptedException + * @throws IOException */ - protected ZooKeeperAccessor getZooKeeperAccessorWithDistinctSession() - throws InterruptedException { + protected ZooKeeper getZooKeeperAccessorWithDistinctSession() + throws InterruptedException, IOException { - return new ZooKeeperAccessor(zookeeperAccessor.hosts, - zookeeperAccessor.sessionTimeout); + return new ZooKeeper(zookeeperAccessor.hosts, + zookeeperAccessor.sessionTimeout, new Watcher(){ + @Override + public void process(WatchedEvent event) { + if(log.isInfoEnabled()) + log.info(event); + }}); // final ZooKeeper zookeeper2 = new ZooKeeper(zookeeperAccessor.hosts, // zookeeperAccessor.sessionTimeout, new Watcher() { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZLockImpl.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZLockImpl.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZLockImpl.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -534,8 +534,7 @@ */ public Void call() throws Exception { - final ZooKeeper zookeeper2 = getZooKeeperAccessorWithDistinctSession() - .getZookeeper(); + final ZooKeeper zookeeper2 = getZooKeeperAccessorWithDistinctSession(); // obtain a lock object. final ZLockImpl zlock = ZLockImpl.getLock(zookeeper2, Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -213,8 +213,6 @@ switch (event.getState()) { case AuthFailed: break; - case ConnectedReadOnly: - break; case Disconnected: lock.lock(); try { @@ -233,10 +231,12 @@ lock.unlock(); } break; - case NoSyncConnected: - break; - case SaslAuthenticated: - break; +// case ConnectedReadOnly: // not in 3.3.3 +// break; +// case NoSyncConnected: // not in 3.3.3 +// break; +// case SaslAuthenticated: // not in 3.3.3 +// break; case SyncConnected: lock.lock(); try { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2013-10-23 18:47:03 UTC (rev 7474) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/BigdataServlet.java 2013-10-24 14:36:40 UTC (rev 7475) @@ -186,6 +186,7 @@ case Leader: return true; default: + log.warn(haStatus.name()); buildResponse(resp, HTTP_METHOD_NOT_ALLOWED, MIME_TEXT_PLAIN, haStatus.name()); // Not writable. Response has been committed. @@ -195,13 +196,13 @@ } /** - * If the node is not writable, then commit a response and return + * If the node is not readable, then commit a response and return * <code>false</code>. Otherwise return <code>true</code>. * * @param req * @param resp * - * @return <code>true</code> iff the node is writable. + * @return <code>true</code> iff the node is readable. * * @throws IOException */ @@ -227,6 +228,7 @@ return true; default: // Not ready. + log.warn(haStatus.name()); buildResponse(resp, HTTP_METHOD_NOT_ALLOWED, MIME_TEXT_PLAIN, haStatus.name()); // Response has been committed. This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-23 18:47:10
|
Revision: 7474 http://bigdata.svn.sourceforge.net/bigdata/?rev=7474&view=rev Author: thompsonbry Date: 2013-10-23 18:47:03 +0000 (Wed, 23 Oct 2013) Log Message: ----------- javadoc Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java 2013-10-23 18:44:45 UTC (rev 7473) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java 2013-10-23 18:47:03 UTC (rev 7474) @@ -100,6 +100,9 @@ * @see http://wiki.apache.org/hadoop/ZooKeeper/FAQ, which has a state * transition diagram for the {@link ZooKeeper} client. * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/718" > + * HAJournalServer needs to handle ZK client connection loss </a> + * * FIXME Check all use of {@link SessionExpiredException}, of * {@link ZooKeeper.States#CLOSED} or {@link ZooKeeper.States#isAlive()}, * and of {@link KeeperState#Expired} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-23 18:44:51
|
Revision: 7473 http://bigdata.svn.sourceforge.net/bigdata/?rev=7473&view=rev Author: thompsonbry Date: 2013-10-23 18:44:45 +0000 (Wed, 23 Oct 2013) Log Message: ----------- fix for bad import that broke CI test suite build. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestSplitZPath.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestSplitZPath.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestSplitZPath.java 2013-10-23 17:30:19 UTC (rev 7472) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestSplitZPath.java 2013-10-23 18:44:45 UTC (rev 7473) @@ -23,7 +23,8 @@ */ package com.bigdata.quorum.zk; -import cern.colt.Arrays; +import java.util.Arrays; + import junit.framework.TestCase2; public class TestSplitZPath extends TestCase2 { This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-23 17:30:41
|
Revision: 7472 http://bigdata.svn.sourceforge.net/bigdata/?rev=7472&view=rev Author: thompsonbry Date: 2013-10-23 17:30:19 +0000 (Wed, 23 Oct 2013) Log Message: ----------- See #718. This is a major refactoring and relayering of the HAJournalServer, HAQuorumService, AbstractServer, ZooKeeper, ZooKeeperAccessor, HAClient, and ZKQuorumImpl. This targets a problem where the zookeeper session becomes expired. - The OVERRIDES TEST SUITE IS CURRENTLY DISABLED IN CI. The rest of the HA CI tests are running green. - AbstractServer<init>() now does not start any asynchronous threads. This is handled entirely by AbstractServer.run(). - HAJournalServer.startupHook() - AbstractQuorum.terminate() no longer attempts to remove the member from the quorum. This can not be done reliably if there is a zookeeper client connection issue. Instead, we terminate all asynchronous processing. Any queued tasks that had not been executed are cancelled (if they implement Future). - ManageLogicalServiceTask: minor edit to reflect changed signature of static ZKQuorumImpl.setupQuorum() method (no longer accepts the ZooKeeperAccessor, just the ZooKeeper object). - HAClient: No longer maintains a ZooKeeperAccessor. The HAClient must be disconnected if the zookeeper session is expired. This also shuts down River service discovery. - The CreateKB task is run by the NSS life cycle listener (BigdataRDFServletContextListener). This meant that it was not robust if quorum.start(HAQuorumService) either had not been called or quorum.terminate() had since been called. I modified RunMet to also launch the CreateKB task on the quorum leader. (Also because the BigdataRDFServletContextListener is responsible for this happening in non-HA environments). - HAJournalServer: The ErrorTask now detects and handles an expired zookeeper session by terminating the HAQuorumService and the restarting it. - HAQuorumService.start() obtains the HAClient connection, exports the service proxy, sets up the key znodes for the logical service, and enters the RestoreTask. - HAQuorumService.terminate() tears all of that down. When it is run, there is NO RunStateCallable running (i.e., we are not in SeekConsensus, Error, RunMet, or any other run state). - The HAJournal remains open and the NSS remains running if we do HAQuorumService.terminate(). The HAJournal and NSS are only closed / stopped if we shutdown the HAJournalServer process. - ZKQuorumClinet: This interface adds the getZooKeeper() and getZooKeeperACL() methods. - ZKQuorumImpl currently uses ZKQuorumClient for a generic type. This is causing some bounds mismatch problems. I think that we should back the ZKQuorumClient out of the generic type and use a runtime assertion in ZKQuorumImpl.start(QuorumClient) to make sure that we are using the right kind of QuorumClient. - ZKQuorumImpl no longer uses the ZooKeeperAccessor. It gets the ZooKeeper client connection from the (ZK)QuorumClient. If the connection is expired, then ZKQuorumImpl.terminate() must be called and then ZKQuorumImpl.start(QuorumClient) can be called with a client that has a valid connection. This allows us to control when we obtain a new ZK connection. This decision is managed by HAJournalServer.HAQuorumService.terminate(). When that method calls HAClient.disconnect(), the old ZK connection is thrown away a new one may be obtained. - Removed the code path in ServiceCache.serviceRemoved() that did an RMI back to the service to see if it was still valid. That RMI was blocking when we un-exported the proxy. This change could also effect the BigdataFederation. - DumpLogs tests: the tests were not awaiting the initial KB create, so they were failing stochastically depending on whether the simpleTransaction() was issued before the initial commit point existing that defined the initial KB. - The ZooKeeper session timeout has been raised to 20 seconds in the HA CI test suite. Some of the tests were modified to wait for at least this long, e.g., when waiting for a quorum state change that is triggered by a zookeeper session expiration. - HAQuorumService.terminate() unexports the service proxy. This means that the test suite can no longer assume that the service proxy object will remain the same for the life cycle of the service. THIS ISSUE MUST BE ADDRESSED IN THE TEST HARNESS. This mainly causes problems in the "overrides" test suite. - The test harness can no longer interpret a failure to reach a service by RMI has indicating that the service is dead. Instead, safeDestroy() now does a kill if the service is unreachable since the service proxy might have been unexported and possibly even re-exported. - A delay was introduced in the testStartABC_halogRestart() tests. Without this 1000ms delay, the halog assertions were failing. Those assertions might still fail stochastically in CI. We need some positive condition that we can await before making those assertions. - The /status page was made robust to some new failure modes. Primarily the ZooKeeper client can now be associated with an expired session, River discovery can now be disabled, and the HAQuorumService might not be available from quorum.getClient(). All of those things can happen if there is a zookeeper session expiration that forces us to terminate the HAQuorumService. - There were some errors introduced into the deployment version of HAJournal.config by the mis-positioning of a comment block. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/ZK_DISCONNECT_HANDLING/bigdata/src/resources/logging/log4j-dev.properties branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-A.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/service/jini/lookup/ServiceCache.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/AbstractHA3JournalServerTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-A.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-B.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournal-C.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/HAJournalTest.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestAll.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3DumpLogs.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHAJournalServerOverride.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/dumpFile.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/journal/jini/ha/zkClient.config branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/AbstractZkQuorumTestCase.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/MockQuorumMember.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestAll.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkQuorum.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestZkSingletonQuorumSemantics.java branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/CreateKBTask.java branches/ZK_DISCONNECT_HANDLING/bigdata-sails/src/java/com/bigdata/rdf/sail/webapp/HAStatusServletUtil.java branches/ZK_DISCONNECT_HANDLING/src/resources/HAJournal/HAJournal.config Added Paths: ----------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/quorum/zk/TestSplitZPath.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-23 17:30:19 UTC (rev 7472) @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -48,7 +49,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -639,18 +639,6 @@ // } // } - /* - * Let the service know that it is no longer running w/ the quorum. - */ - try { - client.terminate(); - } catch (Throwable t) { - if (InnerCause.isInnerCause(t, InterruptedException.class)) { - interrupted = true; - } else { - launderThrowable(t); - } - } if (watcher != null) { try { watcher.terminate(); @@ -681,8 +669,13 @@ /* * Cancel any tasks which did not terminate in a timely manner. */ - watcherActionService.shutdownNow(); + final List<Runnable> notrun = watcherActionService.shutdownNow(); watcherActionService = null; + for (Runnable r : notrun) { + if (r instanceof Future) { + ((Future<?>) r).cancel(true/* mayInterruptIfRunning */); + } + } } } if (actorActionService != null) { @@ -699,10 +692,15 @@ interrupted = true; } finally { /* - * Cancel any tasks which did terminate in a timely manner. + * Cancel any tasks which did not terminate in a timely manner. */ - actorActionService.shutdownNow(); + final List<Runnable> notrun = actorActionService.shutdownNow(); actorActionService = null; + for (Runnable r : notrun) { + if (r instanceof Future) { + ((Future<?>) r).cancel(true/* mayInterruptIfRunning */); + } + } } } if (!sendSynchronous) { @@ -715,10 +713,31 @@ // Will be propagated below. interrupted = true; } finally { + /* + * Cancel any tasks which did terminate in a timely manner. + */ + final List<Runnable> notrun = eventService.shutdownNow(); eventService = null; + for (Runnable r : notrun) { + if (r instanceof Future) { + ((Future<?>) r).cancel(true/* mayInterruptIfRunning */); + } + } } } /* + * Let the service know that it is no longer running w/ the quorum. + */ + try { + client.terminate(); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + interrupted = true; + } else { + launderThrowable(t); + } + } + /* * Clear all internal state variables that mirror the distributed * quorum state and then signal all conditions so anyone blocked * will wake up. @@ -842,6 +861,19 @@ } } +// @Override + protected C getClientNoLock() { +// lock.lock(); +// try { + final C client = this.client; + if (client == null) + throw new IllegalStateException(); + return client; +// } finally { +// lock.unlock(); +// } + } + @Override public QuorumMember<S> getMember() { lock.lock(); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/resources/logging/log4j-dev.properties =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/resources/logging/log4j-dev.properties 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/resources/logging/log4j-dev.properties 2013-10-23 17:30:19 UTC (rev 7472) @@ -277,8 +277,8 @@ #log4j.logger.com.bigdata.journal.AbstractBufferStrategy=ALL log4j.logger.com.bigdata.journal.jini.ha=ALL #log4j.logger.com.bigdata.service.jini.lookup=ALL -#log4j.logger.com.bigdata.quorum=ALL -#log4j.logger.com.bigdata.quorum.zk=ALL +log4j.logger.com.bigdata.quorum=ALL +log4j.logger.com.bigdata.quorum.zk=ALL #log4j.logger.com.bigdata.quorum.quorumState=ALL,destPlain #log4j.logger.com.bigdata.io.writecache=ALL Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/jini/start/ManageLogicalServiceTask.java 2013-10-23 17:30:19 UTC (rev 7472) @@ -221,7 +221,7 @@ * Setup the quorum state. */ ZKQuorumImpl.setupQuorum(logicalServiceZPath, 1/* replicationFactor */, - fed.getZookeeperAccessor(), acl); + fed.getZookeeperAccessor().getZookeeper(), acl); try { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-23 17:30:19 UTC (rev 7472) @@ -80,7 +80,6 @@ import com.bigdata.jini.lookup.entry.ServiceUUID; import com.bigdata.jini.start.config.ZookeeperClientConfig; import com.bigdata.jini.util.JiniUtil; -import com.bigdata.journal.jini.ha.HAClient.HAConnection; import com.bigdata.service.AbstractService; import com.bigdata.service.IService; import com.bigdata.service.IServiceShutdown; @@ -263,6 +262,11 @@ protected Configuration config; /** + * The as-configured entry attributes. + */ + private final List<Entry> entries; + + /** * A configured name for the service -or- a default value if no {@link Name} * was found in the {@link Configuration}. */ @@ -579,7 +583,13 @@ runState = new AtomicReference<RunState>(RunState.Start); - // Show the copyright banner during startup. + /* + * Display the banner. + * + * Note: This also installs the UncaughtExceptionHandler. + * + * @see https://sourceforge.net/apps/trac/bigdata/ticket/601 + */ Banner.banner(); if (lifeCycle == null) @@ -589,24 +599,19 @@ setSecurityManager(); + // Note the process id (best guess). + this.pid = PIDUtil.getPID(); + /* - * Display the banner. + * The runtime shutdown hook appears to be a robust way to handle ^C by + * providing a clean service termination. * - * Note: This also installs the UncaughtExceptionHandler. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/601 + * Note: This is setup before we start any async threads, including + * service discovery. */ - Banner.banner(); -// Thread.setDefaultUncaughtExceptionHandler( -// new Thread.UncaughtExceptionHandler() { -// public void uncaughtException(Thread t, Throwable e) { -// log.warn("Uncaught exception in thread", e); -// } -// }); + Runtime.getRuntime().addShutdownHook( + new ShutdownThread(false/* destroy */, this)); - // Note the process id (best guess). - this.pid = PIDUtil.getPID(); - /* * Read jini configuration & service properties */ @@ -617,6 +622,9 @@ final JiniClientConfig jiniClientConfig; try { + // Create client. + haClient = new HAClient(args); + config = ConfigurationProvider.getInstance(args); // cacheMissTimeout = (Long) config.getEntry(COMPONENT, @@ -855,6 +863,9 @@ } + // Save a reference to the as-configured Entry[] attributes. + this.entries = entries; + /* * Extract how the service will provision itself from the * Configuration. @@ -867,7 +878,7 @@ Exporter.class, // type (of the return object) /* * The default exporter is a BasicJeriExporter using a - * TcpServerEnpoint. + * TcpServerEndpoint. */ new BasicJeriExporter(TcpServerEndpoint.getInstance(0), new BasicILFactory()) @@ -879,203 +890,114 @@ throw new AssertionError();// keeps compiler happy. } - /* - * The runtime shutdown hook appears to be a robust way to handle ^C by - * providing a clean service termination. - * - * Note: This is setup before we start any async threads, including - * service discovery. - */ - Runtime.getRuntime().addShutdownHook( - new ShutdownThread(false/* destroy */, this)); - - final HAConnection ctx; - try { - - // Create client. - haClient = new HAClient(args); - - // Connect. - ctx = haClient.connect(); - -// /* -// * Note: This class will perform multicast discovery if ALL_GROUPS -// * is specified and otherwise requires you to specify one or more -// * unicast locators (URIs of hosts running discovery services). As -// * an alternative, you can use LookupDiscovery, which always does -// * multicast discovery. -// */ -// lookupDiscoveryManager = new LookupDiscoveryManager( -// jiniClientConfig.groups, jiniClientConfig.locators, -// this /* DiscoveryListener */, config); +// Note: Moved HAClient.connect() into quorumService.start(). +// final HAConnection ctx; +// try { // -// /* -// * Setup a helper class that will be notified as services join or -// * leave the various registrars to which the data server is -// * listening. -// */ -// try { +// // Create client. +// haClient = new HAClient(args); // -// serviceDiscoveryManager = new ServiceDiscoveryManager( -// lookupDiscoveryManager, new LeaseRenewalManager(), -// config); +// // Connect. +// ctx = haClient.connect(); +// +//// /* +//// * Note: This class will perform multicast discovery if ALL_GROUPS +//// * is specified and otherwise requires you to specify one or more +//// * unicast locators (URIs of hosts running discovery services). As +//// * an alternative, you can use LookupDiscovery, which always does +//// * multicast discovery. +//// */ +//// lookupDiscoveryManager = new LookupDiscoveryManager( +//// jiniClientConfig.groups, jiniClientConfig.locators, +//// this /* DiscoveryListener */, config); +//// +//// /* +//// * Setup a helper class that will be notified as services join or +//// * leave the various registrars to which the data server is +//// * listening. +//// */ +//// try { +//// +//// serviceDiscoveryManager = new ServiceDiscoveryManager( +//// lookupDiscoveryManager, new LeaseRenewalManager(), +//// config); +//// +//// } catch (IOException ex) { +//// +//// throw new RuntimeException( +//// "Could not initiate service discovery manager", ex); +//// +//// } +//// +//// } catch (IOException ex) { +//// +//// fatal("Could not setup discovery", ex); +//// throw new AssertionError();// keep the compiler happy. +//// +// } catch (ConfigurationException ex) { // -// } catch (IOException ex) { +// fatal("Configuration error: " + ex, ex); // -// throw new RuntimeException( -// "Could not initiate service discovery manager", ex); +// throw new AssertionError();// keep the compiler happy. // -// } +// } catch(Throwable ex) { +// +// fatal("Could not connect: " + ex, ex); // -// } catch (IOException ex) { -// -// fatal("Could not setup discovery", ex); // throw new AssertionError();// keep the compiler happy. // - } catch (ConfigurationException ex) { +// } - fatal("Configuration error: " + ex, ex); + // Note: Moved newService() call into AbstractServer.run(). +// /* +// * Create the service object. +// */ +// try { +// +// /* +// * Note: By creating the service object here rather than outside of +// * the constructor we potentially create problems for subclasses of +// * AbstractServer since their own constructor will not have been +// * executed yet. +// * +// * Some of those problems are worked around using a JiniClient to +// * handle all aspects of service discovery (how this service locates +// * the other services in the federation). +// * +// * Note: If you explicitly assign values to those clients when the +// * fields are declared, e.g., [timestampServiceClient=null] then the +// * ctor will overwrite the values set by [newService] since it is +// * running before those initializations are performed. This is +// * really crufty, may be JVM dependent, and needs to be refactored +// * to avoid this subclass ctor init problem. +// */ +// +// if (log.isInfoEnabled()) +// log.info("Creating service impl..."); +// +// // init. +// impl = newService(config); +// +// if (log.isInfoEnabled()) +// log.info("Service impl is " + impl); +// +// } catch(Exception ex) { +// +// fatal("Could not start service: "+this, ex); +// throw new AssertionError();// keeps compiler happy. +// } - throw new AssertionError();// keep the compiler happy. - - } catch(Throwable ex) { - - fatal("Could not connect: " + ex, ex); - - throw new AssertionError();// keep the compiler happy. - - } - - /* - * Create the service object. - */ - try { - - /* - * Note: By creating the service object here rather than outside of - * the constructor we potentially create problems for subclasses of - * AbstractServer since their own constructor will not have been - * executed yet. - * - * Some of those problems are worked around using a JiniClient to - * handle all aspects of service discovery (how this service locates - * the other services in the federation). - * - * Note: If you explicitly assign values to those clients when the - * fields are declared, e.g., [timestampServiceClient=null] then the - * ctor will overwrite the values set by [newService] since it is - * running before those initializations are performed. This is - * really crufty, may be JVM dependent, and needs to be refactored - * to avoid this subclass ctor init problem. - */ - - if (log.isInfoEnabled()) - log.info("Creating service impl..."); - - // init. - impl = newService(config); // FIXME Pass in the HAClient.Connection. - - if (log.isInfoEnabled()) - log.info("Service impl is " + impl); - - } catch(Exception ex) { +// // Export the service proxy. +// exportProxy(haClient, impl); - fatal("Could not start service: "+this, ex); - throw new AssertionError();// keeps compiler happy. - } - - /* - * Export a proxy object for this service instance. - * - * Note: This must be done before we start the join manager since the - * join manager will register the proxy. - */ - try { - - proxy = exporter.export(impl); - - if (log.isInfoEnabled()) - log.info("Proxy is " + proxy + "(" + proxy.getClass() + ")"); - - } catch (ExportException ex) { - - fatal("Export error: "+this, ex); - throw new AssertionError();// keeps compiler happy. - } - - /* - * Start the join manager. - */ - try { - - assert proxy != null : "No proxy?"; - - final Entry[] attributes = entries.toArray(new Entry[0]); - - if (this.serviceID != null) { - - /* - * We read the serviceID from local storage (either the - * serviceIDFile and/or the Configuration). - */ - - joinManager = new JoinManager(proxy, // service proxy - attributes, // attr sets - serviceID, // ServiceID - ctx.getDiscoveryManagement(), // DiscoveryManager - new LeaseRenewalManager(), // - config); - - } else { - - /* - * We are requesting a serviceID from the registrar. - */ - - joinManager = new JoinManager(proxy, // service proxy - attributes, // attr sets - this, // ServiceIDListener - ctx.getDiscoveryManagement(), // DiscoveryManager - new LeaseRenewalManager(), // - config); - - } - - } catch (Exception ex) { - - fatal("JoinManager: " + this, ex); - throw new AssertionError();// keeps compiler happy. - } - - /* - * Note: This is synchronized in case set via listener by the - * JoinManager, which would be rather fast action on its part. - */ - synchronized (this) { - - if (this.serviceID != null) { - - /* - * Notify the service that it's service UUID has been set. - * - * @todo Several things currently depend on this notification. - * In effect, it is being used as a proxy for the service - * registration event. - */ - - notifyServiceUUID(serviceID); - - } - - } - } /** * Simple representation of state (non-blocking, safe). Some fields reported * in the representation may be <code>null</code> depending on the server * state. - */@Override + */ + @Override public String toString() { // note: MAY be null. @@ -1201,6 +1123,103 @@ } /** + * Export a proxy object for this service instance. + */ + synchronized protected void exportProxy(final Remote impl) { + + /* + * Export a proxy object for this service instance. + * + * Note: This must be done before we start the join manager since the + * join manager will register the proxy. + */ + try { + + proxy = exporter.export(impl); + + if (log.isInfoEnabled()) + log.info("EXPORTED PROXY: Proxy is " + proxy + "(" + + proxy.getClass() + ")"); + + } catch (ExportException ex) { + + fatal("Export error: " + this, ex); + throw new AssertionError();// keeps compiler happy. + } + + /* + * Start the join manager. + */ + try { + + assert proxy != null : "No proxy?"; + + // The as-configured Entry[] attributes. + final Entry[] attributes = entries.toArray(new Entry[0]); + + // Note: Throws IllegalStateException if not connected. + final HAClient.HAConnection ctx = getHAClient().getConnection(); + + if (this.serviceID != null) { + + /* + * We read the serviceID from local storage (either the + * serviceIDFile and/or the Configuration). + */ + + joinManager = new JoinManager(proxy, // service proxy + attributes, // attr sets + serviceID, // ServiceID + ctx.getDiscoveryManagement(), // DiscoveryManager + new LeaseRenewalManager(), // + config); + + } else { + + /* + * We are requesting a serviceID from the registrar. + */ + + joinManager = new JoinManager(proxy, // service proxy + attributes, // attr sets + this, // ServiceIDListener + ctx.getDiscoveryManagement(), // DiscoveryManager + new LeaseRenewalManager(), // + config); + + } + + } catch (Exception ex) { + + fatal("JoinManager: " + this, ex); + throw new AssertionError();// keeps compiler happy. + } + + /* + * Note: This is synchronized in case set via listener by the + * JoinManager, which would be rather fast action on its part. + */ + synchronized (this) { + + if (this.serviceID != null) { + + /* + * Notify the service that it's service UUID has been set. + * + * @todo Several things currently depend on this notification. + * In effect, it is being used as a proxy for the service + * registration event. + */ + + notifyServiceUUID(serviceID); + + } + + } + + } + + /** * Unexports the {@link #proxy} - this is a NOP if the proxy is * <code>null</code>. * @@ -1212,29 +1231,48 @@ * * @see Exporter#unexport(boolean) */ - synchronized protected boolean unexport(boolean force) { + synchronized protected boolean unexport(final boolean force) { - if (log.isInfoEnabled()) - log.info("force=" + force + ", proxy=" + proxy); + try { - try { + boolean unexported = false; if (proxy != null) { + log.warn("UNEXPORT PROXY: force=" + force + ", proxy=" + proxy); + if (exporter.unexport(force)) { - return true; + unexported = true; } else { - log.warn("Proxy was not unexported? : "+this); + log.warn("Proxy was not unexported? : " + this); } } - return false; + if (joinManager != null) { + + try { + joinManager.terminate(); + + } catch (Throwable ex) { + + log.error("Could not terminate the join manager: " + this, ex); + + } finally { + + joinManager = null; + + } + + } + + return unexported; + } finally { proxy = null; @@ -1507,28 +1545,28 @@ try { - /* - * Unexport the proxy, making the service no longer available. - * - * Note: If you do not do this then the client can still make - * requests even after you have terminated the join manager and the - * service is no longer visible in the service browser. - */ - try { +// /* +// * Unexport the proxy, making the service no longer available. +// * +// * Note: If you do not do this then the client can still make +// * requests even after you have terminated the join manager and the +// * service is no longer visible in the service browser. +// */ +// try { +// +// if (log.isInfoEnabled()) +// log.info("Unexporting the service proxy."); +// +// unexport(true/* force */); +// +// } catch (Throwable ex) { +// +// log.error("Problem unexporting service: " + this, ex); +// +// /* Ignore */ +// +// } - if (log.isInfoEnabled()) - log.info("Unexporting the service proxy."); - - unexport(true/* force */); - - } catch (Throwable ex) { - - log.error("Problem unexporting service: " + this, ex); - - /* Ignore */ - - } - if (destroy && impl != null && impl instanceof IService) { final IService tmp = (IService) impl; @@ -1749,24 +1787,6 @@ } /** - * Return <code>true</code> iff the {@link RunState} is - * {@link RunState#Start} -or- {@link RunState#Running}. - * - * @return <code>true</code> if the service is starting or running and - * otherwise <code>false</code>. - */ - public boolean isRunning() { - - switch (runState.get()) { - case Running: - case Start: - return true; - } - return false; - - } - - /** * Terminates service management threads. * <p> * Subclasses which start additional service management threads SHOULD @@ -1779,26 +1799,9 @@ if (log.isInfoEnabled()) log.info("Terminating service management threads."); - if (joinManager != null) { - - try { + if (haClient.isConnected()) + haClient.disconnect(false/* immediateShutdown */); - joinManager.terminate(); - - } catch (Throwable ex) { - - log.error("Could not terminate the join manager: " + this, ex); - - } finally { - - joinManager = null; - - } - - } - - haClient.disconnect(false/*immediateShutdown*/); - // if (serviceDiscoveryManager != null) { // // serviceDiscoveryManager.terminate(); @@ -1840,6 +1843,7 @@ /** * Run the server (this should be invoked from <code>main</code>. */ + @Override public void run() { if (log.isInfoEnabled()) @@ -1865,11 +1869,24 @@ boolean started = false; try { + /* + * Create the service object. + */ + if (log.isInfoEnabled()) + log.info("Creating service impl..."); + + impl = newService(config); + + if (log.isInfoEnabled()) + log.info("Service impl is " + impl); + startUpHook(); started = true; + } catch (Exception e) { + log.error(e, e); } finally { if (!started) - shutdownNow(false/*destroy*/); + shutdownNow(false/* destroy */); } if (runState.compareAndSet(RunState.Start, RunState.Running)) { @@ -1947,6 +1964,7 @@ } + @Override public void run() { // format log message. @@ -1957,7 +1975,10 @@ try { - // Wait long enough for the RMI request to end. + /* + * Wait long enough for the RMI request to end (in case the + * service is shutdown in response to an RMI). + */ Thread.sleep(250/* ms */); } catch (InterruptedException e) { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/DumpLogDigests.java 2013-10-23 17:30:19 UTC (rev 7472) @@ -49,12 +49,11 @@ import com.bigdata.ha.HAGlue; import com.bigdata.ha.IndexManagerCallable; import com.bigdata.ha.halog.IHALogReader; -import com.bigdata.journal.ITransactionService; import com.bigdata.journal.jini.ha.HAClient.HAConnection; import com.bigdata.journal.jini.ha.HALogIndex.IHALogRecord; import com.bigdata.journal.jini.ha.SnapshotIndex.ISnapshotRecord; import com.bigdata.quorum.Quorum; -import com.bigdata.quorum.QuorumClient; +import com.bigdata.quorum.zk.ZKQuorumClient; import cutthecrap.utils.striterators.EmptyIterator; @@ -643,28 +642,30 @@ return tmp; } + private List<HAGlue> services(final String serviceRoot) throws IOException, + ExecutionException, KeeperException, InterruptedException { - private List<HAGlue> services(final String serviceRoot) throws IOException, - ExecutionException, KeeperException, InterruptedException { - - - final List<HAGlue> ret = new ArrayList<HAGlue>(); - - final HAConnection cnxn = client.connect(); - - final Quorum<HAGlue, QuorumClient<HAGlue>> quorum = cnxn.getHAGlueQuorum(serviceRoot); - final UUID[] uuids = quorum.getJoined(); - - final HAGlue[] haglues = cnxn.getHAGlueService(uuids); - - for (HAGlue haglue : haglues) { - ret.add(haglue); - } - - client.disconnect(true/*immediate shutdown*/); - - return ret; + final List<HAGlue> ret = new ArrayList<HAGlue>(); - } - + final HAConnection cnxn = client.connect(); + + final Quorum<HAGlue, ZKQuorumClient<HAGlue>> quorum = cnxn + .getHAGlueQuorum(serviceRoot); + + final UUID[] uuids = quorum.getJoined(); + + final HAGlue[] haglues = cnxn.getHAGlueService(uuids); + + for (HAGlue haglue : haglues) { + + ret.add(haglue); + + } + + client.disconnect(true/* immediate shutdown */); + + return ret; + + } + } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-23 17:30:19 UTC (rev 7472) @@ -74,6 +74,7 @@ import com.bigdata.quorum.QuorumListener; import com.bigdata.quorum.zk.QuorumTokenState; import com.bigdata.quorum.zk.ZKQuorum; +import com.bigdata.quorum.zk.ZKQuorumClient; import com.bigdata.quorum.zk.ZKQuorumImpl; import com.bigdata.service.IDataService; import com.bigdata.service.IService; @@ -187,7 +188,7 @@ } /** - * {@inheritDoc} + * Terminate the connection if one exists. * <p> * Note: Immediate shutdown can cause odd exceptions to be logged. Normal * shutdown is recommended unless there is a reason to force immediate @@ -215,17 +216,17 @@ try { - final HAConnection fed = this.fed.get(); + final HAConnection cxn = this.fed.get(); - if (fed != null) { + if (cxn != null) { if (immediateShutdown) { - fed.shutdownNow(); + cxn.shutdownNow(); } else { - fed.shutdown(); + cxn.shutdown(); } @@ -241,25 +242,30 @@ } + /** + * Return a valid connection. If the client is not connected, then it is + * connected. If the client is connected, the existing connection is + * returned. + */ public HAConnection connect() { connectLock.lock(); try { - HAConnection fed = this.fed.get(); + HAConnection cxn = this.fed.get(); - if (fed == null) { + if (cxn == null) { - fed = new HAConnection(jiniConfig, zooConfig); + cxn = new HAConnection(jiniConfig, zooConfig); - this.fed.set(fed); + this.fed.set(cxn); - fed.start(this); + cxn.start(this); } - return fed; + return cxn; } finally { @@ -645,7 +651,8 @@ */ private final AtomicReference<HAClient> clientRef = new AtomicReference<HAClient>(); - private ZooKeeperAccessor zka; +// private ZooKeeperAccessor zka; + private ZooKeeper zk; private LookupDiscoveryManager lookupDiscoveryManager; @@ -666,8 +673,8 @@ * avoid contentions for a lock in * {@link #terminateDiscoveryProcesses()}. */ - private final Map<String, Quorum<HAGlue, QuorumClient<HAGlue>>> quorums = Collections - .synchronizedMap(new LinkedHashMap<String, Quorum<HAGlue, QuorumClient<HAGlue>>>()); + private final Map<String, Quorum<HAGlue, ZKQuorumClient<HAGlue>>> quorums = Collections + .synchronizedMap(new LinkedHashMap<String, Quorum<HAGlue, ZKQuorumClient<HAGlue>>>()); private HAConnection(final JiniClientConfig jiniConfig, final ZookeeperClientConfig zooConfig) { @@ -743,15 +750,22 @@ } /** - * Return an object that may be used to obtain a {@link ZooKeeper} - * client and that may be used to obtain the a new {@link ZooKeeper} - * client if the current session has been expired (an absorbing state - * for the {@link ZooKeeper} client). + * Return the {@link ZooKeeper} client connection. + * + * @throws IllegalStateException + * if the {@link HAClient} is not connected. */ - public ZooKeeperAccessor getZookeeperAccessor() { + public ZooKeeper getZookeeper() { - return zka; + assertOpen(); + final ZooKeeper zk = this.zk; + + if (zk == null) + throw new IllegalStateException(); + + return zk; + } private synchronized void start(final HAClient client) { @@ -776,8 +790,8 @@ * zookeeper servers. */ - zka = new ZooKeeperAccessor(zooConfig.servers, - zooConfig.sessionTimeout); + zk = new ZooKeeperAccessor(zooConfig.servers, + zooConfig.sessionTimeout).getZookeeper(); /* * Note: This class will perform multicast discovery if @@ -948,7 +962,7 @@ } // Terminate any quorums opened by the HAConnection. - for (Quorum<HAGlue, QuorumClient<HAGlue>> quorum : quorums.values()) { + for (Quorum<HAGlue, ZKQuorumClient<HAGlue>> quorum : quorums.values()) { quorum.terminate(); @@ -964,14 +978,15 @@ */ log.warn("FORCING UNCURABLE ZOOKEEPER DISCONNECT"); - if (zka != null) { + if (zk != null) { try { - zka.close(); + zk.close(); } catch (InterruptedException e) { // propagate the interrupt. Thread.currentThread().interrupt(); } + zk = null; } // try { @@ -1098,7 +1113,7 @@ final String logicalServiceZPathPrefix = zkClientConfig.zroot + "/" + HAJournalServer.class.getName(); - final String[] children = zka.getZookeeper() + final String[] children = getZookeeper() .getChildren(logicalServiceZPathPrefix, false/* watch */) .toArray(new String[0]); @@ -1107,11 +1122,17 @@ } /** - * Obtain an object that will reflect the state of the {@link Quorum} - * for the HA replication identified by the logical service identifier. + * This is a convenience method that provides access to a Quorum for + * some logical service corresponding to an HA replication cluster. The + * returned quorum object will monitor the and reflect the state of the + * identified quorum. A simple {@link QuorumClient} is started and will + * run for that quorum until any of: (a) the quorum terminated; (b) the + * {@link HAConnection} is closed; (c) it is noticed that the zookeeper + * session is expired. * <p> * Note: Each quorum that you start has asynchronous threads and MUST be - * terminated. + * terminated. Termination is automatically performed by + * {@link HAClient#disconnect(boolean)}. * * @param logicalServiceId * The logical service identifier. @@ -1123,20 +1144,33 @@ * @throws InterruptedException * @throws KeeperException */ - public Quorum<HAGlue, QuorumClient<HAGlue>> getHAGlueQuorum( + public Quorum<HAGlue, ZKQuorumClient<HAGlue>> getHAGlueQuorum( final String logicalServiceId) throws KeeperException, InterruptedException { /* - * Fast path. Check for an existing instance. FIXME MUst also verify that quorum is running. If terminated, then start(). But must also pass in the AbstractQuorumClient to be run if we are the HAJournalServer. Or let the caller start the quorum for their client rather than embedding that logic into this method. + * Fast path. Check for an existing instance. */ - Quorum<HAGlue, QuorumClient<HAGlue>> quorum; + Quorum<HAGlue, ZKQuorumClient<HAGlue>> quorum; synchronized (quorums) { quorum = quorums.get(logicalServiceId); if (quorum != null) { + /* + * Note: There is no guarantee that the client is running + * for the returned quorum. If there is no such quorum, then + * a client is created for that quorum. If a quorum is + * found, then it is simply returned. The argument is that a + * quorum will continue to run for a client unless the + * HAClient is disconnected, the zookeeper session is + * expired, or quorum.terminate() is explicitly invoked. The + * former two cases can only be cured by a disconnect() + * followed by a connect(). The latter has to be done + * explicitly by the application, so they can deal with it + * and start a new client if desired. + */ return quorum; } @@ -1167,19 +1201,19 @@ * Ensure key znodes exist. */ try { - zka.getZookeeper().create(zkClientConfig.zroot, + getZookeeper().create(zkClientConfig.zroot, new byte[] {/* data */}, acl, CreateMode.PERSISTENT); } catch (NodeExistsException ex) { // ignore. } try { - zka.getZookeeper().create(logicalServiceZPathPrefix, + getZookeeper().create(logicalServiceZPathPrefix, new byte[] {/* data */}, acl, CreateMode.PERSISTENT); } catch (NodeExistsException ex) { // ignore. } try { - zka.getZookeeper().create(logicalServiceZPath, + getZookeeper().create(logicalServiceZPath, new byte[] {/* data */}, acl, CreateMode.PERSISTENT); } catch (NodeExistsException ex) { // ignore. @@ -1195,7 +1229,7 @@ final Stat stat = new Stat(); - data = zka.getZookeeper().getData(quorumZPath, + data = getZookeeper().getData(quorumZPath, false/* watch */, stat); final QuorumTokenState tokenState = (QuorumTokenState) SerializerUtil @@ -1251,8 +1285,8 @@ quorums.put( logicalServiceId, - quorum = new ZKQuorumImpl<HAGlue, QuorumClient<HAGlue>>( - replicationFactor, zka, acl)); + quorum = new ZKQuorumImpl<HAGlue, ZKQuorumClient<HAGlue>>( + replicationFactor));//, zka, acl)); quorum.start(new MyQuorumClient(logicalServiceZPath)); @@ -1264,7 +1298,8 @@ } - private class MyQuorumClient extends AbstractQuorumClient<HAGlue> { + private class MyQuorumClient extends AbstractQuorumClient<HAGlue> + implements ZKQuorumClient<HAGlue> { protected MyQuorumClient(final String logicalServiceZPath) { @@ -1279,6 +1314,20 @@ } + @Override + public ZooKeeper getZooKeeper() { + + return HAConnection.this.getZookeeper(); + + } + + @Override + public List<ACL> getACL() { + + return zooConfig.acl; + + } + } /* @@ -1494,7 +1543,7 @@ /** * Simple main just connects and then disconnects after a few seconds. It - * prints out all discovered {@link HAGlue} services before it shutsdown. + * prints out all discovered {@link HAGlue} services before it shuts down. * * @param args * @@ -1539,7 +1588,7 @@ /* * This shows up to lookup a known replication cluster. */ - final Quorum<HAGlue, QuorumClient<HAGlue>> quorum = ctx + final Quorum<HAGlue, ZKQuorumClient<HAGlue>> quorum = ctx .getHAGlueQuorum(logicalServiceId); // Setup listener that logs quorum events @ TRACE. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-A.config =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-A.config 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-A.config 2013-10-23 17:30:19 UTC (rev 7472) @@ -126,7 +126,7 @@ static private leaseTimeout = ConfigMath.s2ms(20); // zookeeper - static private sessionTimeout = (int)ConfigMath.s2ms(5); + static private sessionTimeout = (int)ConfigMath.s2ms(20); /* * Configuration for default KB. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-B.config 2013-10-23 17:30:19 UTC (rev 7472) @@ -126,7 +126,7 @@ static private leaseTimeout = ConfigMath.s2ms(20); // zookeeper - static private sessionTimeout = (int)ConfigMath.s2ms(5); + static private sessionTimeout = (int)ConfigMath.s2ms(20); /* * Configuration for default KB. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournal-C.config 2013-10-23 17:30:19 UTC (rev 7472) @@ -126,7 +126,7 @@ static private leaseTimeout = ConfigMath.s2ms(20); // zookeeper - static private sessionTimeout = (int)ConfigMath.s2ms(5); + static private sessionTimeout = (int)ConfigMath.s2ms(20); /* * Configuration for default KB. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-23 16:58:05 UTC (rev 7471) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-23 17:30:19 UTC (rev 7472) @@ -42,6 +42,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -93,12 +94,14 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockUtility; import com.bigdata.journal.WORMStrategy; -import com.bigdata.quorum.AbstractQuorum; +import com.bigdata.journal.jini.ha.HAClient.HAConnection; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumEvent; import com.bigdata.quorum.QuorumException; import com.bigdata.quorum.QuorumListener; +import com.bigdata.quorum.zk.ZKQuorumClient; import com.bigdata.quorum.zk.ZKQuorumImpl; +import com.bigdata.rdf.sail.CreateKBTask; import com.bigdata.rdf.sail.webapp.ConfigParams; import com.bigdata.rdf.sail.webapp.NanoSparqlServer; import com.bigdata.rwstore.RWStore; @@ -110,7 +113,6 @@ import com.bigdata.util.concurrent.LatchedExecutor; import com.bigdata.util.concurrent.MonitoredFutureTask; import com.bigdata.util.config.NicUtil; -import com.bigdata.zookeeper.ZooKeeperAccessor; import com.sun.jini.start.LifeCycle; /** @@ -428,7 +430,10 @@ /** * The port at which the embedded {@link NanoSparqlServer} will respond * to HTTP requests (default {@value #DEFAULT_PORT}). This MAY be ZERO - * (0) to use a random open port. + * (0) to use a random open port. + * + * TODO We should be able to specify the interface, not just the port. Is + * there any way to do that with jetty? */ String PORT = "port"; @@ -482,6 +487,11 @@ private String logicalServiceId; /** + * The zpath for the znode that dominates the logical service. + */ + private String logicalServiceZPathPrefix; + + /** * The zpath for the logical service. * * @see ConfigurationOptions#LOGICAL_SERVICE_ID @@ -605,14 +615,57 @@ } + /** + * Ensure key znodes exist. + * + * @throws KeeperException + * @throws InterruptedException + */ + private void setupZNodes() throws KeeperException, InterruptedException { + + if (log.isInfoEnabled()) { + log.info("Ensuring key znodes exist."); + } + + final ZookeeperClientConfig zkClientConfig = getHAClient() + .getZookeeperClientConfig(); + + final List<ACL> acl = zkClientConfig.acl; + + final ZooKeeper zk = getHAClient().getConnection().getZookeeper(); + + /* + * Ensure key znodes exist. + */ + try { + zk.create(zkClientConfig.zroot, new byte[] {/* data */}, acl, + CreateMode.PERSISTENT); + } catch (NodeExistsException ex) { + // ignore. + } + try { + zk.create(logicalServiceZPathPrefix, new byte[] {/* data */}, acl, + CreateMode.PERSISTENT); + } catch (NodeExistsException ex) { + // ignore. + } + try { + zk.create(logicalServiceZPath, new byte[] {/* data */}, acl, + CreateMode.PERSISTENT); + } catch (NodeExistsException ex) { + // ignore. + } + + } + @Override protected HAGlue newService(final Configuration config) throws Exception { - /* - * Verify discovery of at least one ServiceRegistrar. - */ - getHAClient().getConnection().awaitServiceRegistrars(10/* timeout */, - TimeUnit.SECONDS); +// /* +// * Verify discovery of at least one ServiceRegistrar. +// */ +// getHAClient().getConnection().awaitServiceRegistrars(10/* timeout */, +// TimeUnit.SECONDS); // { // final long begin = System.currentTimeMillis(); @@ -683,9 +736,9 @@ { - final String logicalServiceZPathPrefix = zkClientConfig.zroot + "/" + logicalServiceZPathPrefix = zkClientConfig.zroot + "/" + HA... [truncated message content] |
From: <tho...@us...> - 2013-10-23 16:58:27
|
Revision: 7471 http://bigdata.svn.sourceforge.net/bigdata/?rev=7471&view=rev Author: thompsonbry Date: 2013-10-23 16:58:05 +0000 (Wed, 23 Oct 2013) Log Message: ----------- javadoc only Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java 2013-10-23 14:08:59 UTC (rev 7470) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/counters/AbstractProcessCollector.java 2013-10-23 16:58:05 UTC (rev 7471) @@ -77,10 +77,13 @@ } /** + * {@inheritDoc} + * <p> * Creates the {@link ActiveProcess} and the - * {@link ActiveProcess#start(com.bigdata.counters.AbstractStatisticsCollector.AbstractProcessReader)}s - * it passing in the value returned by the {@link #getProcessReader()} + * {@link ActiveProcess#start(AbstractProcessReader)}s it passing in the + * value returned by the {@link #getProcessReader()} */ + @Override public void start() { log.info(""); @@ -91,6 +94,7 @@ } + @Override public void stop() { log.info(""); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-10-23 14:09:05
|
Revision: 7470 http://bigdata.svn.sourceforge.net/bigdata/?rev=7470&view=rev Author: mrpersonick Date: 2013-10-23 14:08:59 +0000 (Wed, 23 Oct 2013) Log Message: ----------- fixed the IndexOutOfBounds problem and a problem with the term advancer Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java 2013-10-23 13:40:25 UTC (rev 7469) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java 2013-10-23 14:08:59 UTC (rev 7470) @@ -468,7 +468,7 @@ final byte[] endKey = keyOrder.getToKey(KeyBuilder.newInstance(), pred); //SuccessorUtil.successor(startKey.clone()); - endIndex = indexOf(ndx, endKey); + endIndex = indexOf(ndx, endKey) - 1; cache.put(pred, new CacheHit(startIndex, endIndex)); @@ -484,7 +484,7 @@ } - final long range = endIndex - startIndex; + final long range = endIndex - startIndex + 1; if (log.isTraceEnabled()) { log.trace("range: " + range); @@ -539,7 +539,7 @@ * Reading to the offset plus the limit (minus 1), or the end * index, whichever is smaller. */ - final long toIndex = Math.min(startIndex + offset + limit, + final long toIndex = Math.min(startIndex + offset + limit - 1, endIndex); if (fromIndex > toIndex) { @@ -550,7 +550,7 @@ final byte[] fromKey = ndx.keyAt(fromIndex); - final byte[] toKey = ndx.keyAt(toIndex);//SuccessorUtil.successor(ndx.keyAt(toIndex)); + final byte[] toKey = SuccessorUtil.successor(ndx.keyAt(toIndex)); final int arity = pred.arity(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <mrp...@us...> - 2013-10-23 13:40:33
|
Revision: 7469 http://bigdata.svn.sourceforge.net/bigdata/?rev=7469&view=rev Author: mrpersonick Date: 2013-10-23 13:40:25 +0000 (Wed, 23 Oct 2013) Log Message: ----------- fixed the IndexOutOfBounds problem and a problem with the term advancer Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java 2013-10-22 16:42:21 UTC (rev 7468) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/eval/SliceServiceFactory.java 2013-10-23 13:40:25 UTC (rev 7469) @@ -68,6 +68,7 @@ import com.bigdata.rdf.sparql.ast.service.ServiceFactory; import com.bigdata.rdf.sparql.ast.service.ServiceNode; import com.bigdata.rdf.spo.DistinctMultiTermAdvancer; +import com.bigdata.rdf.spo.DistinctTermAdvancer; import com.bigdata.rdf.spo.ISPO; import com.bigdata.rdf.spo.SPO; import com.bigdata.rdf.spo.SPOKeyOrder; @@ -483,7 +484,7 @@ } - final long range = endIndex - startIndex + 1; + final long range = endIndex - startIndex; if (log.isTraceEnabled()) { log.trace("range: " + range); @@ -538,7 +539,7 @@ * Reading to the offset plus the limit (minus 1), or the end * index, whichever is smaller. */ - final long toIndex = Math.min(startIndex + offset + limit - 1, + final long toIndex = Math.min(startIndex + offset + limit, endIndex); if (fromIndex > toIndex) { @@ -549,7 +550,7 @@ final byte[] fromKey = ndx.keyAt(fromIndex); - final byte[] toKey = SuccessorUtil.successor(ndx.keyAt(toIndex)); + final byte[] toKey = ndx.keyAt(toIndex);//SuccessorUtil.successor(ndx.keyAt(toIndex)); final int arity = pred.arity(); @@ -567,13 +568,17 @@ /* * Use a multi-term advancer to skip the bound entries and just - * get to the variables. + * get to the variables. + * + * Not a good idea. Needs to visit each tuple individually. */ - final DistinctMultiTermAdvancer advancer = //null; - new DistinctMultiTermAdvancer( - arity, //arity - 3 or 4 - numBoundEntries // #boundEntries - anything not a var and not bound by incoming bindings - ); + final DistinctMultiTermAdvancer advancer = null; +// new DistinctMultiTermAdvancer( +// arity, //arity - 3 or 4 +// numBoundEntries // #boundEntries - anything not a var and not bound by incoming bindings +// ); +// final DistinctTermAdvancer advancer2 = +// new DistinctTermAdvancer(arity); final ITupleIterator it = ndx.rangeIterator(fromKey, toKey, 0/* capacity */, IRangeQuery.KEYS | IRangeQuery.CURSOR, advancer); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-22 16:42:27
|
Revision: 7468 http://bigdata.svn.sourceforge.net/bigdata/?rev=7468&view=rev Author: thompsonbry Date: 2013-10-22 16:42:21 +0000 (Tue, 22 Oct 2013) Log Message: ----------- fix to bad import. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java 2013-10-21 14:23:29 UTC (rev 7467) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java 2013-10-22 16:42:21 UTC (rev 7468) @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.InetAddress; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,8 +45,6 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; -import cern.colt.Arrays; - import com.bigdata.btree.BytesUtil; import com.bigdata.io.TestCase3; import com.bigdata.util.config.NicUtil; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-21 14:23:36
|
Revision: 7467 http://bigdata.svn.sourceforge.net/bigdata/?rev=7467&view=rev Author: thompsonbry Date: 2013-10-21 14:23:29 +0000 (Mon, 21 Oct 2013) Log Message: ----------- Sync to martyn. Refactoring AbstractQuorum.terminate() and AbstractQuorum test suite. See #718 Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestAll.java branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-21 13:45:30 UTC (rev 7466) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-21 14:23:29 UTC (rev 7467) @@ -301,7 +301,9 @@ * Note: This is volatile to allow visibility without holding the * {@link #lock}. The field is only modified in {@link #start(QuorumClient)} * and {@link #terminate()}, and those methods use the {@link #lock} to - * impose an appropriate ordering over events. + * impose an appropriate ordering over events. The quorum is running iff + * there is a client for which it is delivering events. When <code>null</code>, + * the quorum is not running. * * @see #start(QuorumClient) */ @@ -582,60 +584,60 @@ */ interruptAll(); if (client == null) { - // No client is attached. + // No client? Not running. return; } if (log.isDebugEnabled()) log.debug("client=" + client); - if (client instanceof QuorumMember<?>) { - /* - * Update the distributed quorum state by removing our client - * from the set of member services. This will also cause a - * service leave, pipeline leave, and any vote to be withdrawn. - * - * We have observed Condition spins during terminate() that - * result in HAJournalServer hangs. This runs another Thread - * that will interrupt this Thread if the quorum member is - * unable to complete the memberRemove() within a timeout. - * - * Note: Since we are holding the lock in the current thread, we - * MUST execute memberRemove() in this thread (it requires the - * lock). Therefore, I have used a 2nd thread that will - * interrupt this thread if it does not succeed in a polite - * removal from the quorum within a timeout. - */ - { - final long MEMBER_REMOVE_TIMEOUT = 5000;// ms. - final AtomicBoolean didRemove = new AtomicBoolean(false); - final Thread self = Thread.currentThread(); - final Thread t = new Thread() { - public void run() { - try { - Thread.sleep(MEMBER_REMOVE_TIMEOUT); - } catch (InterruptedException e) { - // Expected. Ignored. - return; - } - if (!didRemove.get()) { - log.error("Timeout awaiting quorum member remove."); - self.interrupt(); - } - } - }; - t.setDaemon(true); - t.start(); - try { - // Attempt memberRemove() (interruptably). - actor.memberRemoveInterruptable(); - didRemove.set(true); // Success. - } catch (InterruptedException e) { - // Propagate the interrupt. - Thread.currentThread().interrupt(); - } finally { - t.interrupt(); // Stop execution of [t]. - } - } - } +// if (client instanceof QuorumMember<?>) { +// /* +// * Update the distributed quorum state by removing our client +// * from the set of member services. This will also cause a +// * service leave, pipeline leave, and any vote to be withdrawn. +// * +// * We have observed Condition spins during terminate() that +// * result in HAJournalServer hangs. This runs another Thread +// * that will interrupt this Thread if the quorum member is +// * unable to complete the memberRemove() within a timeout. +// * +// * Note: Since we are holding the lock in the current thread, we +// * MUST execute memberRemove() in this thread (it requires the +// * lock). Therefore, I have used a 2nd thread that will +// * interrupt this thread if it does not succeed in a polite +// * removal from the quorum within a timeout. +// */ +// { +// final long MEMBER_REMOVE_TIMEOUT = 5000;// ms. +// final AtomicBoolean didRemove = new AtomicBoolean(false); +// final Thread self = Thread.currentThread(); +// final Thread t = new Thread() { +// public void run() { +// try { +// Thread.sleep(MEMBER_REMOVE_TIMEOUT); +// } catch (InterruptedException e) { +// // Expected. Ignored. +// return; +// } +// if (!didRemove.get()) { +// log.error("Timeout awaiting quorum member remove."); +// self.interrupt(); +// } +// } +// }; +// t.setDaemon(true); +// t.start(); +// try { +// // Attempt memberRemove() (interruptably). +// actor.memberRemoveInterruptable(); +// didRemove.set(true); // Success. +// } catch (InterruptedException e) { +// // Propagate the interrupt. +// Thread.currentThread().interrupt(); +// } finally { +// t.interrupt(); // Stop execution of [t]. +// } +// } +// } /* * Let the service know that it is no longer running w/ the quorum. @@ -643,10 +645,23 @@ try { client.terminate(); } catch (Throwable t) { - launderThrowable(t); + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + interrupted = true; + } else { + launderThrowable(t); + } } - if (watcher != null) - watcher.terminate(); + if (watcher != null) { + try { + watcher.terminate(); + } catch (Throwable t) { + if (InnerCause.isInnerCause(t, InterruptedException.class)) { + interrupted = true; + } else { + launderThrowable(t); + } + } + } if (watcherActionService != null) { watcherActionService.shutdown(); try { @@ -664,7 +679,7 @@ interrupted = true; } finally { /* - * Cancel any tasks which did terminate in a timely manner. + * Cancel any tasks which did not terminate in a timely manner. */ watcherActionService.shutdownNow(); watcherActionService = null; @@ -704,8 +719,16 @@ } } /* - * Signal all conditions so anyone blocked will wake up. + * Clear all internal state variables that mirror the distributed + * quorum state and then signal all conditions so anyone blocked + * will wake up. */ + listeners.clear(); // discard listeners. + token = lastValidToken = NO_QUORUM; + members.clear(); + votes.clear(); + joined.clear(); + pipeline.clear(); quorumChange.signalAll(); membersChange.signalAll(); pipelineChange.signalAll(); @@ -713,8 +736,6 @@ joinedChange.signalAll(); // discard reference to the client. this.client = null; - // discard listeners. - listeners.clear(); } finally { lock.unlock(); } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-21 13:45:30 UTC (rev 7466) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-21 14:23:29 UTC (rev 7467) @@ -742,15 +742,22 @@ // Save UUID -> QuorumMember mapping on the fixture. fixture.known.put(client.getServiceId(), client); } + @Override public void terminate() { - final MockQuorumWatcher watcher = (MockQuorumWatcher) getWatcher(); + MockQuorumWatcher watcher = null; + try { + watcher = (MockQuorumWatcher) getWatcher(); + } catch(IllegalStateException ex) { + // Already terminated. + } super.terminate(); // Stop the service accepting events for the watcher. watcherService.shutdownNow(); - // remove our watcher as a listener for the fixture's inner quorum. - fixture.removeWatcher(watcher); + // remove our watcher as a listener for the fixture's inner quorum. + if (watcher != null) + fixture.removeWatcher(watcher); } /** Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestAll.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestAll.java 2013-10-21 13:45:30 UTC (rev 7466) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestAll.java 2013-10-21 14:23:29 UTC (rev 7467) @@ -48,14 +48,6 @@ public class TestAll extends TestCase { private final static Logger log = Logger.getLogger(TestAll.class); - - /** - * FIXME This is used to avoid a CI deadlock that needs to be tracked down. - * - * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/530"> - * Journal HA </a> - */ - final private static boolean s_includeQuorum = true; /** * @@ -74,42 +66,38 @@ * Returns a test that will run each of the implementation specific test * suites in turn. */ - public static Test suite() - { + public static Test suite() { final TestSuite suite = new TestSuite("quorum"); - if (s_includeQuorum) { - /* - * Test the fixture used to test the quorums (the fixture builds on the - * same base class). - */ - suite.addTestSuite(TestMockQuorumFixture.class); - - /* - * Test the quorum semantics for a singleton quorum. This unit test - * allows us to verify that each quorum state change is translated into - * the appropriate methods against the public API of the quorum client - * or quorum member. - */ - suite.addTestSuite(TestSingletonQuorumSemantics.class); - - /* - * Test the quorum semantics for a highly available quorum of 3 - * nodes. The main points to test here are the particulars of events not - * observable with a singleton quorum, including a service join which - * does not trigger a quorum meet, a service leave which does not - * trigger a quorum break, a leader leave, etc. - */ - suite.addTestSuite(TestHA3QuorumSemantics.class); - - /* - * Run the test HA3 suite a bunch of times. - */ - suite.addTest(StressTestHA3.suite()); + /* + * Test the fixture used to test the quorums (the fixture builds on the + * same base class). + */ + suite.addTestSuite(TestMockQuorumFixture.class); - } - + /* + * Test the quorum semantics for a singleton quorum. This unit test + * allows us to verify that each quorum state change is translated into + * the appropriate methods against the public API of the quorum client + * or quorum member. + */ + suite.addTestSuite(TestSingletonQuorumSemantics.class); + + /* + * Test the quorum semantics for a highly available quorum of 3 nodes. + * The main points to test here are the particulars of events not + * observable with a singleton quorum, including a service join which + * does not trigger a quorum meet, a service leave which does not + * trigger a quorum break, a leader leave, etc. + */ + suite.addTestSuite(TestHA3QuorumSemantics.class); + + /* + * Run the test HA3 suite a bunch of times. + */ + suite.addTest(StressTestHA3.suite()); + return suite; } Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-10-21 13:45:30 UTC (rev 7466) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/TestSingletonQuorumSemantics.java 2013-10-21 14:23:29 UTC (rev 7467) @@ -27,6 +27,7 @@ package com.bigdata.quorum; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -100,6 +101,66 @@ } + /** + * Unit test for quorum member add followed by the termination of the quorum + * client. This checks for proper termination of the client, including the + * clear down of the quorum's internal state. + * + * @throws InterruptedException + */ + public void test_memberAdd_terminateClient() throws InterruptedException { + + final Quorum<?, ?> quorum = quorums[0]; + final QuorumMember<?> client = clients[0]; + final QuorumActor<?,?> actor = actors[0]; + final UUID serviceId = client.getServiceId(); + + // client is not a member. + assertFalse(client.isMember()); + assertEquals(new UUID[] {}, quorum.getMembers()); + + // instruct actor to add client as a member. + actor.memberAdd(); + fixture.awaitDeque(); + + // client is a member. + assertTrue(client.isMember()); + assertEquals(new UUID[] {serviceId}, quorum.getMembers()); + + /* + * Verify termination of the quorum for that client. + */ + + assertEquals(client, quorum.getClient()); + + quorum.terminate(); + + try { + quorum.getClient(); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // State was cleared. + assertEquals(Quorum.NO_QUORUM, quorum.token()); + assertEquals(Quorum.NO_QUORUM, quorum.lastValidToken()); + assertEquals(new UUID[] {}, quorum.getMembers()); + assertEquals(new UUID[] {}, quorum.getJoined()); + assertEquals(new UUID[] {}, quorum.getPipeline()); + assertEquals(Collections.emptyMap(), quorum.getVotes()); + + try { + // Note: Quorum reference was cleared. Client throws exception. + assertFalse(client.isMember()); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // Double-termination is safe. + quorum.terminate(); + + } + /** * Unit test for write pipeline add/remove. * @throws InterruptedException @@ -345,6 +406,7 @@ * Unit test for the protocol up to a service join, which triggers a leader * election. Since the singleton quorum has only one member our client will * be elected the leader. + * * @throws InterruptedException */ public void test_serviceJoin() throws InterruptedException { @@ -479,6 +541,97 @@ } /** + * Unit test verifying that we clear down the quorum's reflection of the + * distributed quorum state where we first have a quorum meet and then + * terminate the quorum client. + * + * @throws InterruptedException + */ + public void test_serviceJoin_terminateClient() throws InterruptedException { + + final AbstractQuorum<?, ?> quorum = quorums[0]; + final MockQuorumMember<?> client = clients[0]; + final QuorumActor<?,?> actor = actors[0]; + final UUID serviceId = client.getServiceId(); + + final long lastCommitTime = 0L; + final long lastCommitTime2 = 2L; + + // declare the service as a quorum member. + actor.memberAdd(); + fixture.awaitDeque(); + + assertTrue(client.isMember()); + assertEquals(new UUID[]{serviceId},quorum.getMembers()); + + // add to the pipeline. + actor.pipelineAdd(); + fixture.awaitDeque(); + + assertTrue(client.isPipelineMember()); + assertEquals(new UUID[]{serviceId},quorum.getPipeline()); + + // cast a vote for a lastCommitTime. + actor.castVote(lastCommitTime); + fixture.awaitDeque(); + + assertEquals(1,quorum.getVotes().size()); + assertEquals(new UUID[] { serviceId }, quorum.getVotes().get( + lastCommitTime)); + + // verify the consensus was updated. + assertEquals(lastCommitTime, client.lastConsensusValue); + + // wait for quorum meet. + final long token1 = quorum.awaitQuorum(); + + // verify service was joined. + assertTrue(client.isJoinedMember(quorum.token())); + assertEquals(new UUID[] { serviceId }, quorum.getJoined()); + + // validate the token was assigned. + fixture.awaitDeque(); + assertEquals(Quorum.NO_QUORUM + 1, quorum.lastValidToken()); + assertEquals(Quorum.NO_QUORUM + 1, quorum.token()); + assertTrue(quorum.isQuorumMet()); + + /* + * Terminate the quorum. The state should be cleared down. + */ + + // Verify termination of the quorum for that client. + assertEquals(client, quorum.getClient()); + + // terminate the quorum. + quorum.terminate(); + + try { + quorum.getClient(); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // State was cleared. + assertEquals(Quorum.NO_QUORUM, quorum.token()); + assertEquals(Quorum.NO_QUORUM, quorum.lastValidToken()); + assertEquals(new UUID[] {}, quorum.getMembers()); + assertEquals(new UUID[] {}, quorum.getJoined()); + assertEquals(new UUID[] {}, quorum.getPipeline()); + assertEquals(Collections.emptyMap(), quorum.getVotes()); + + try { + // Note: Quorum reference was cleared. Client throws exception. + assertFalse(client.isMember()); + } catch (IllegalStateException ex) { + log.info("Ignoring expected exception: " + ex); + } + + // re-terminate() is safe. + quorum.terminate(); + + } + + /** * Unit test of timeout in {@link Quorum#awaitQuorum(long, TimeUnit)}. and * {@link Quorum#awaitBreak(long, TimeUnit)}. * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-21 13:45:37
|
Revision: 7466 http://bigdata.svn.sourceforge.net/bigdata/?rev=7466&view=rev Author: thompsonbry Date: 2013-10-21 13:45:30 +0000 (Mon, 21 Oct 2013) Log Message: ----------- Bug fix to SSSP. It was terminating early due to paying attention to the VS.isChanged field in SSSP.isChanged(). The VS.isChanged field is initialized to true for the initial frontier and reset in apply() unless there is a change. If we pay attention to this field in SSSP.isChanged() then the algorithm will terminate early. I have "fixed" SSSP to ignore this field such that it does the scatter stage for the first round rather than terminating early. I plan to replace the SSSP implementation completely with a push style Scatter of updates. The semantics of isChanged can be properly fixed within the context of that optimized algorithm. Removed unimplemented 2nd test for SSSP which depends on link weights. Removed unimplemented PR test. See #629 (Graph Processing API) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-21 12:50:47 UTC (rev 7465) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-21 13:45:30 UTC (rev 7466) @@ -19,6 +19,7 @@ import org.openrdf.model.Statement; import org.openrdf.model.Value; +import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; @@ -86,6 +87,16 @@ */ private Integer dist = Integer.MAX_VALUE; + /** + * Note: This flag is cleared by apply() and then conditionally set + * iff the {@link #dist()} is replaced by the new value from the + * gather. Thus, if the gather does not reduce the value, then the + * propagation of the algorithm is halted. However, this causes the + * algorithm to NOT scatter for round zero, which causes it to halt. + * I plan to fix the algorithm by doing the "push" style update in + * the scatter phase. That will completely remove the gather phase + * of the algorithm. + */ private boolean changed = false; // /** @@ -167,21 +178,21 @@ // return null; // // } -// -// @Override -// public EdgesEnum getGatherEdges() { -// -// return EdgesEnum.InEdges; -// -// } -// -// @Override -// public EdgesEnum getScatterEdges() { -// -// return EdgesEnum.OutEdges; -// -// } + @Override + public EdgesEnum getGatherEdges() { + + return EdgesEnum.InEdges; + + } + + @Override + public EdgesEnum getScatterEdges() { + + return EdgesEnum.OutEdges; + + } + /** * {@inheritDoc} * <p> @@ -292,14 +303,22 @@ } - @Override - public boolean isChanged(final IGASState<SSSP.VS, SSSP.ES, Integer> state, - final Value u) { + /* + * Note: Enabling this check causes SSSP to fail. The problem is that the + * changed flag is cleared when we enter apply(). Therefore, it gets cleared + * on the first round before we do the scatter and testing us.isChanged() + * here causes the scatter step to be skipped. This causes the propagation + * of the updates to stop. This issue is also documented on the [isChanged] + * flag of the vertex state. + */ +// @Override +// public boolean isChanged(final IGASState<SSSP.VS, SSSP.ES, Integer> state, +// final Value u) { +// +// return state.getState(u).isChanged(); +// +// } - return state.getState(u).isChanged(); - - } - /** * The remote vertex is scheduled if this vertex is changed. * <p> @@ -310,7 +329,8 @@ * {@inheritDoc} * * FIXME OPTIMIZE: Test both variations on a variety of data sets and see - * which is better: + * which is better (actually, just replace with a push style Scatter of the + * updates): * * <p> * Zhisong wrote: In the original GASengine, the scatter operator only need @@ -324,7 +344,7 @@ * depends on the fan-out of the scatter at time t versus the fan-in of the * gather at time t+1. The optimization might only benefit if a reasonable * fraction of the destination vertices wind up NOT being retriggered. I - * will try on these variations in the Java code as well. * + * will try on these variations in the Java code as well. * </p> */ @Override @@ -361,11 +381,11 @@ } -// @Override -// public boolean nextRound(IGASContext ctx) { -// -// return true; -// -// } + @Override + public boolean nextRound(final IGASContext<VS, ES, Integer> ctx) { + return true; + + } + } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java 2013-10-21 12:50:47 UTC (rev 7465) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java 2013-10-21 13:45:30 UTC (rev 7466) @@ -55,7 +55,8 @@ suite.addTestSuite(TestCC.class); - suite.addTestSuite(TestPR.class); + // FIXME Add unit test for Page Rank. +// suite.addTestSuite(TestPR.class); return suite; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java 2013-10-21 12:50:47 UTC (rev 7465) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java 2013-10-21 13:45:30 UTC (rev 7466) @@ -107,7 +107,7 @@ * SSSP must also support floating point distances since the link weights * are often non-integer. */ - public void testSSSP2() throws Exception { + public void _testSSSP2() throws Exception { fail("Finish test."); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-21 12:50:54
|
Revision: 7465 http://bigdata.svn.sourceforge.net/bigdata/?rev=7465&view=rev Author: thompsonbry Date: 2013-10-21 12:50:47 +0000 (Mon, 21 Oct 2013) Log Message: ----------- adding a class that Martyn and I have been using to test zk disconnect semantics. Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java Added Paths: ----------- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java 2013-10-21 12:48:38 UTC (rev 7464) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java 2013-10-21 12:50:47 UTC (rev 7465) @@ -49,8 +49,8 @@ final TestSuite suite = new TestSuite("zookeeper client library"); - // test suite for zookeeper session expiration semantics. - suite.addTestSuite(TestZookeeperSessionSemantics.class); +// // test suite for zookeeper session expiration semantics. +// suite.addTestSuite(TestZookeeperSessionSemantics.class); // test ability to handle an expired session. suite.addTestSuite(TestZookeeperAccessor.class); Added: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java =================================================================== --- branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java (rev 0) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestZookeeperSessionSemantics.java 2013-10-21 12:50:47 UTC (rev 7465) @@ -0,0 +1,464 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ +/* + * Created on Jan 30, 2009 + */ + +package com.bigdata.zookeeper; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; + +import cern.colt.Arrays; + +import com.bigdata.btree.BytesUtil; +import com.bigdata.io.TestCase3; +import com.bigdata.util.config.NicUtil; + +/** + * Test suite for {@link ZooKeeper} session expire and disconnect semantics. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * @version $Id$ + */ +public class TestZookeeperSessionSemantics extends TestCase3 { + + /** + * + */ + public TestZookeeperSessionSemantics() { + } + + /** + * @param name + */ + public TestZookeeperSessionSemantics(String name) { + super(name); + } + + /** + * The requested session timeout. The negotiated session timeout is obtained + * from the {@link ZooKeeper} client connection. + */ + private int requestedSessionTimeout; + + private static final List<ACL> acl = Ids.OPEN_ACL_UNSAFE; + + // the chosen client port. + protected int clientPort = -1; + + @Override + protected void setUp() throws Exception { + + try { + + if (log.isInfoEnabled()) + log.info(getName()); + + final int tickTime = Integer.valueOf(System.getProperty( + "test.zookeeper.tickTime", "2000")); + + clientPort = Integer.valueOf(System.getProperty( + "test.zookeeper.clientPort", "2081")); + + /* + * Note: This MUST be the actual session timeout that the zookeeper + * service will impose on the client. Some unit tests depend on + * this. + */ + this.requestedSessionTimeout = tickTime * 2; + + log.warn("clientPort=" + clientPort + ", tickTime=" + tickTime + + ", requestedSessionTimeout=" + requestedSessionTimeout); + + // Verify zookeeper is running on the local host at the client port. + final InetAddress localIpAddr = NicUtil.getInetAddress(null, 0, + null, true); + { + try { + ZooHelper.ruok(localIpAddr, clientPort); + } catch (Throwable t) { + fail("Zookeeper not running:: " + localIpAddr + ":" + + clientPort, t); + } + } + + } catch (Throwable t) { + + // // don't leave around the dataDir if the setup fails. + // recursiveDelete(dataDir); + log.error(getName() + " : " + t, t); + throw new Exception(t); + + } + + } + +// private void killZKServer() throws InterruptedException { +// +// try { +// ZooHelper.kill(clientPort); +// } catch (Throwable t) { +// fail("Zookeeper not running::" + clientPort, t); +// } +// +// Thread.sleep(1000);// wait. +// +// final InetAddress localIpAddr = NicUtil.getInetAddress(null, 0, +// null, true); +// try { +// ZooHelper.ruok(localIpAddr, clientPort); +// } catch (Throwable t) { +// log.info("Expected exception: Zookeeper not running:: " +// + localIpAddr + ":" + clientPort, t); +// return; +// } +// fail("Zookeeper still running."); +// } + + @Override + protected void tearDown() throws Exception { + + super.tearDown(); + + } + + public void test_handleExpiredSession() throws InterruptedException, + KeeperException, IOException { + + final String hosts = "localhost:" + clientPort; + + final Lock lock = new ReentrantLock(); + final Condition expireCond = lock.newCondition(); + final Condition connectCond = lock.newCondition(); + final Condition disconnectCond = lock.newCondition(); + final AtomicBoolean didExpire = new AtomicBoolean(false); + final AtomicBoolean didDisconnect = new AtomicBoolean(false); + + /* + * Start an instance and run until it gets an assigned sessionId. + */ + { + final ZooKeeper zk1a = new ZooKeeper(hosts, + requestedSessionTimeout, new Watcher(){ + @Override + public void process(WatchedEvent event) { + log.warn(event); + }}); + int i = 0; + while (i < 10) { + boolean done = false; + if (zk1a.getState() == ZooKeeper.States.CONNECTED) { + done = true; + } + log.info("zk.getState()=" + zk1a.getState() + + ", zk.getSessionId()=" + zk1a.getSessionId()); + if (done) + break; + Thread.sleep(500); + i++; + } + if(zk1a.getState() != ZooKeeper.States.CONNECTED) { + fail("Did not connect."); + } + zk1a.close(); + } + + final ZooKeeper zk1 = new ZooKeeper(hosts, requestedSessionTimeout, + new Watcher() { + /** + * Note: The default watcher will not receive any events + * after a session expire. A {@link Zookeeper#close()} + * causes an immediate session expire. Thus, no events + * (include the session expire) will be received after a + * close(). + */ + @Override + public void process(final WatchedEvent event) { + log.warn(event); + switch (event.getState()) { + case AuthFailed: + break; + case ConnectedReadOnly: + break; + case Disconnected: + lock.lock(); + try { + didDisconnect.set(true); + disconnectCond.signalAll(); + } finally { + lock.unlock(); + } + break; + case Expired: + lock.lock(); + try { + didExpire.set(true); + expireCond.signalAll(); + } finally { + lock.unlock(); + } + break; + case NoSyncConnected: + break; + case SaslAuthenticated: + break; + case SyncConnected: + lock.lock(); + try { + connectCond.signalAll(); + } finally { + lock.unlock(); + } + break; + case Unknown: + break; + } + + } + }); + + /* + * Note: You can not obtain the negotiated session timeout until the + * zookeeper client has connected to a zookeeper service (or rather, + * it will return ZERO until it is connected). + */ + final int negotiatedSessionTimeout; + lock.lock(); + try { + log.info("Waiting zk connected."); + connectCond.await(10, TimeUnit.SECONDS); + negotiatedSessionTimeout = zk1.getSessionTimeout(); + if (log.isInfoEnabled()) + log.info("Negotiated sessionTimeout=" + + negotiatedSessionTimeout); + assertNotSame(0, negotiatedSessionTimeout); + assertTrue(negotiatedSessionTimeout > 0); + } finally { + lock.unlock(); + } + + assertTrue(zk1.getState().isAlive()); + + assertFalse(didDisconnect.get()); + + assertFalse(didExpire.get()); + + // clear out test znodes. + destroyZNodes(zk1, "/test"); + + // ensure root /test znode exists. + try { + zk1.create("/test", new byte[] {}, acl, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException ex) { + log.warn("Ignoring: " + ex); + } + + // look at that znode, establishing a watcher. + zk1.getData("/test", true/* watch */, null/* stat */); + + // update the znode's data. + zk1.setData("/test", new byte[] { 1 }, -1/* version */); + + // create an ephemeral sequential znode that is a child of /test. + final String foozpath = zk1.create("/test/foo", new byte[] {}, acl, + CreateMode.EPHEMERAL_SEQUENTIAL); + + // create a 2nd ephemeral sequential znode that is a child of /test. + final String foozpath2 = zk1.create("/test/foo", new byte[] {}, acl, + CreateMode.EPHEMERAL_SEQUENTIAL); + + /* + * Look at that znode, establishing a watcher. + * + * Note: We appear to see node deleted events for the ephemeral znodes + * if the client connection is closed, but the state is still reported + * as SyncConnected rather than SessionExpired. + * + * Note: If we do not establish a watcher for an ephemeral znode, then + * we DO NOT see an node deleted event when the client is closed! + */ + zk1.getData(foozpath, true/* watch */, null/* stat */); +// zk1.getData(foozpath2, true/* watch */, null/* stat */); + +//// close the existing instance. +// log.info("Closing ZK client"); +// zk1.close(); + +// log.fatal("killing local zookeeper service."); +// killZKServer(); +// Thread.sleep(5000); +// fail("done"); + + if (false) { + log.info("Spin loop awaiting !isAlive() for client."); + final long begin = System.currentTimeMillis(); + while (zk1.getState().isAlive()) { + log.info("zk.getState()=" + zk1.getState() + + ", zk.getSessionId()=" + zk1.getSessionId()); + final long elapsed = System.currentTimeMillis() - begin; + if (elapsed > 60000 * 2) + fail("Client still isAlive()."); + Thread.sleep(1000); + } + log.info("Continuing"); + } + + if (true) { + log.error("KILL ZOOKEEPER."); + Thread.sleep(5000); + log.info("Spin loop on ephemeral znode getData() for client."); + while (true) { + try { + zk1.getData(foozpath, true/* watch */, null/* stat */); + } catch (KeeperException ex) { + log.error(ex, ex); + Thread.sleep(1000); + continue; + } + log.info("zk.getState()=" + zk1.getState() + + ", zk.getSessionId()=" + zk1.getSessionId()); + break; +// final long elapsed = System.currentTimeMillis() - begin; +// if (elapsed > 60000 * 2) +// fail("Client still isAlive()."); +// Thread.sleep(1000); + } + log.info("Continuing"); + final byte[] a = zk1.getData(foozpath, true/* watch */, null/* stat */); + assertTrue("Expected " + Arrays.toString(new byte[] { 1 }) + + ", not " + Arrays.toString(a), + BytesUtil.bytesEqual(new byte[] { 1 }, a)); + } + + // // The disconnect event should be immediate. + // lock.lock(); + // try { + // disconnectCond.await(100, TimeUnit.MILLISECONDS); + // } finally { + // lock.unlock(); + // } + // + // assertTrue(didDisconnect.get()); + + assertFalse(didDisconnect.get()); + assertFalse(didExpire.get()); + + assertFalse(zk1.getState().isAlive()); + + /* + * Wait up to a little more than the negotiated session timeout for the + * session to be expired. + */ + lock.lock(); + try { + // Attempt to get the znode again. + new Thread(new Runnable() { + public void run() { + try { + final byte[] tmp = zk1.getData("/test", + true/* watch */, null/* stat */); + } catch (KeeperException e) { + log.error(e, e); + } catch (InterruptedException e) { + log.error(e, e); + } + } + }).start(); + expireCond.await(negotiatedSessionTimeout + 10000, + TimeUnit.MILLISECONDS); + /* + * Note: No events are ever delivered to the watcher with + * KeeperStates:=SessionExpired. This appears to be a design + * decision. + */ + assertFalse(didExpire.get()); + } finally { + lock.unlock(); + } + + /* + * Now obtain a new session. + */ + { + log.warn("Starting new ZK connection"); + final ZooKeeper zk2 = new ZooKeeper(hosts, requestedSessionTimeout, + new Watcher() { + + @Override + public void process(WatchedEvent event) { + log.warn(event); + } + }); + + assertTrue(zk2.getState().isAlive()); + + } + + } + + /** + * Recursive delete of znodes. + * + * @param zpath + * + * @throws KeeperException + * @throws InterruptedException + */ + protected void destroyZNodes(final ZooKeeper zookeeper, final String zpath) + throws KeeperException, InterruptedException { + + // System.err.println("enter : " + zpath); + + final List<String> children = zookeeper.getChildren(zpath, false); + + for (String child : children) { + + destroyZNodes(zookeeper, zpath + "/" + child); + + } + + if(log.isInfoEnabled()) + log.info("delete: " + zpath); + + zookeeper.delete(zpath, -1/* version */); + + } + +} + This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-21 12:48:44
|
Revision: 7464 http://bigdata.svn.sourceforge.net/bigdata/?rev=7464&view=rev Author: thompsonbry Date: 2013-10-21 12:48:38 +0000 (Mon, 21 Oct 2013) Log Message: ----------- Creating a branch to address #718 (handling zk connection loss). Modified Paths: -------------- branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java Added Paths: ----------- branches/ZK_DISCONNECT_HANDLING/ Property changes on: branches/ZK_DISCONNECT_HANDLING ___________________________________________________________________ Added: svn:ignore + ant-build src bin bigdata*.jar ant-release standalone test* countersfinal.xml events.jnl .settings *.jnl TestInsertRate.out SYSTAP-BBT-result.txt U10load+query *.hprof com.bigdata.cache.TestHardReferenceQueueWithBatchingUpdates.exp.csv commit-log.txt eventLog dist bigdata-test com.bigdata.rdf.stress.LoadClosureAndQueryTest.*.csv DIST.bigdata-*.tgz REL.bigdata-*.tgz queryLog* queryRunState* sparql.txt benchmark CI Added: svn:mergeinfo + /branches/BIGDATA_OPENRDF_2_6_9_UPDATE:6769-6785 /branches/BIGDATA_RELEASE_1_2_0:6766-7380 /branches/BTREE_BUFFER_BRANCH:2004-2045 /branches/DEV_BRANCH_27_OCT_2009:2270-2546,2548-2782 /branches/INT64_BRANCH:4486-4522 /branches/JOURNAL_HA_BRANCH:2596-4066 /branches/LARGE_LITERALS_REFACTOR:4175-4387 /branches/LEXICON_REFACTOR_BRANCH:2633-3304 /branches/QUADS_QUERY_BRANCH:4525-4531,4550-4584,4586-4609,4634-4643,4646-4672,4674-4685,4687-4693,4697-4735,4737-4782,4784-4792,4794-4796,4798-4801 /branches/READ_CACHE:7215-7271 /branches/RWSTORE_1_1_0_DEBUG:5896-5935 /branches/TIDS_PLUS_BLOBS_BRANCH:4814-4836 /branches/bugfix-btm:2594-3237 /branches/dev-btm:2574-2730 /branches/fko:3150-3194 /trunk:3392-3437,3656-4061 Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-21 12:45:44 UTC (rev 7463) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-21 12:48:38 UTC (rev 7464) @@ -443,7 +443,7 @@ pipeline = new LinkedHashSet<UUID>(k * 2); } - + @Override protected void finalize() throws Throwable { terminate(); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-21 12:45:44 UTC (rev 7463) +++ branches/ZK_DISCONNECT_HANDLING/bigdata/src/test/com/bigdata/quorum/MockQuorumFixture.java 2013-10-21 12:48:38 UTC (rev 7464) @@ -742,7 +742,7 @@ // Save UUID -> QuorumMember mapping on the fixture. fixture.known.put(client.getServiceId(), client); } - + @Override public void terminate() { final MockQuorumWatcher watcher = (MockQuorumWatcher) getWatcher(); super.terminate(); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-21 12:45:44 UTC (rev 7463) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/AbstractServer.java 2013-10-21 12:48:38 UTC (rev 7464) @@ -973,7 +973,7 @@ log.info("Creating service impl..."); // init. - impl = newService(config); + impl = newService(config); // FIXME Pass in the HAClient.Connection. if (log.isInfoEnabled()) log.info("Service impl is " + impl); @@ -1075,7 +1075,7 @@ * Simple representation of state (non-blocking, safe). Some fields reported * in the representation may be <code>null</code> depending on the server * state. - */ + */@Override public String toString() { // note: MAY be null. Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-21 12:45:44 UTC (rev 7463) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAClient.java 2013-10-21 12:48:38 UTC (rev 7464) @@ -1128,7 +1128,7 @@ InterruptedException { /* - * Fast path. Check for an existing instance. + * Fast path. Check for an existing instance. FIXME MUst also verify that quorum is running. If terminated, then start(). But must also pass in the AbstractQuorumClient to be run if we are the HAJournalServer. Or let the caller start the quorum for their client rather than embedding that logic into this method. */ Quorum<HAGlue, QuorumClient<HAGlue>> quorum; synchronized (quorums) { Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java 2013-10-21 12:45:44 UTC (rev 7463) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/java/com/bigdata/zookeeper/ZooKeeperAccessor.java 2013-10-21 12:48:38 UTC (rev 7464) @@ -263,7 +263,7 @@ try { log.warn("Creating new client"); - + // FIXME must not create new zk while session not expired. zookeeper = new ZooKeeper(hosts, sessionTimeout, new ZooAliveWatcher()); @@ -331,7 +331,7 @@ if (zookeeper != null) { zookeeper.close(); - + // FIXME close must not clear the zk reference. only do this for session expired. zookeeper = null; } @@ -373,11 +373,12 @@ private class ZooAliveWatcher implements Watcher { private boolean connected = false; - + @Override public void process(final WatchedEvent e) { + System.err.println("event: "+e); + // FIXME Does not verify that event is for the current ZK client. + if(!open) return; // FIXME blocks view of events after a close(). - if(!open) return; - if (log.isInfoEnabled()) log.info(e.toString()); @@ -387,7 +388,7 @@ if(!open) return; - switch (e.getState()) { + switch (e.getState()) { // FIXME Review switch states. case Unknown: // @todo what to do with these events? @@ -428,7 +429,7 @@ } for (Watcher w : watchers) { - + System.err.println("send event: "+e); // send event to any registered watchers. w.process(e); Modified: branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java 2013-10-21 12:45:44 UTC (rev 7463) +++ branches/ZK_DISCONNECT_HANDLING/bigdata-jini/src/test/com/bigdata/zookeeper/TestAll.java 2013-10-21 12:48:38 UTC (rev 7464) @@ -49,6 +49,9 @@ final TestSuite suite = new TestSuite("zookeeper client library"); + // test suite for zookeeper session expiration semantics. + suite.addTestSuite(TestZookeeperSessionSemantics.class); + // test ability to handle an expired session. suite.addTestSuite(TestZookeeperAccessor.class); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-21 12:45:51
|
Revision: 7463 http://bigdata.svn.sourceforge.net/bigdata/?rev=7463&view=rev Author: thompsonbry Date: 2013-10-21 12:45:44 +0000 (Mon, 21 Oct 2013) Log Message: ----------- javadoc Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2013-10-21 12:45:15 UTC (rev 7462) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2013-10-21 12:45:44 UTC (rev 7463) @@ -59,7 +59,7 @@ * <dd>iff the label has changed</dd> * </dl> * - * FIXME CC : Implement. Should push the updates through the scatter function. + * FIXME CC : Implement version that pushes updates through the scatter function. * Find an abstraction to support this pattern. It is used by both CC and SSSP. * (We can initially implement this as a Gather (over all edges) plus a * conditional Scatter (over all edges iff the vertex label has changed). We can This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-21 12:45:22
|
Revision: 7462 http://bigdata.svn.sourceforge.net/bigdata/?rev=7462&view=rev Author: thompsonbry Date: 2013-10-21 12:45:15 +0000 (Mon, 21 Oct 2013) Log Message: ----------- added override tags Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/HTreeDistinctBindingSetsOp.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/HTreeDistinctBindingSetsOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/HTreeDistinctBindingSetsOp.java 2013-10-19 20:01:51 UTC (rev 7461) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/HTreeDistinctBindingSetsOp.java 2013-10-21 12:45:15 UTC (rev 7462) @@ -178,7 +178,7 @@ this.state = state; } - + @Override public Void call() throws Exception { final BOpStats stats = context.getStats(); @@ -204,12 +204,12 @@ } finally { - if(context.isLastInvocation()) { + if (context.isLastInvocation()) { state.release(); - + } - + sink.close(); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java 2013-10-19 20:01:51 UTC (rev 7461) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/JVMDistinctBindingSetsOp.java 2013-10-21 12:45:15 UTC (rev 7462) @@ -212,7 +212,7 @@ } } - + @Override public Void call() throws Exception { final BOpStats stats = context.getStats(); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-19 20:02:02
|
Revision: 7461 http://bigdata.svn.sourceforge.net/bigdata/?rev=7461&view=rev Author: thompsonbry Date: 2013-10-19 20:01:51 +0000 (Sat, 19 Oct 2013) Log Message: ----------- Added CC and PR implementations and a reducer to report the histogram over the #of vertices at each depth for BFS. See #629 (Graph Processing API) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/ram/RAMGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/sail/SAILGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/GASRunnerBase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/util/VertexDistribution.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/AbstractGraphTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestAll.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestBFS.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestSSSP.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/ram/TestGather.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/AbstractSailGraphTestCase.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/impl/sail/TestGather.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASEngine.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/graph/impl/bd/BigdataGASState.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestBFS.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestGather.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/graph/impl/bd/TestSSSP.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestCC.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/analytics/TestPR.java branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/data/ssspGraph.png branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/test/com/bigdata/rdf/graph/data/ssspGraph.ttl Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/FrontierEnum.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -0,0 +1,36 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph; + +/** + * Type-safe enumeration characterizing the assumptions of an algorithm + * concerning its initial frontier. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public enum FrontierEnum { + + /** + * The initial frontier is a single vertex. + */ + SingleVertex, + + /** + * The initial frontier is all vertices. + */ + AllVertices; + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASContext.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -50,6 +50,11 @@ IGASState<VS, ES, ST> getGASState(); /** + * The graph access object. + */ + IGraphAccessor getGraphAccessor(); + + /** * Execute one iteration. * * @param stats Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASOptions.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -39,8 +39,28 @@ public interface IGASOptions<VS, ES, ST> { /** + * Return the nature of the initial frontier for this algorithm. + */ + FrontierEnum getInitialFrontierEnum(); + + /** + * Return the type of edges that must exist when sampling the vertices of + * the graph. If {@link EdgesEnum#InEdges} is specified, then each sampled + * vertex will have at least one in-edge. If {@link EdgesEnum#OutEdges} is + * specified, then each sampled vertex will have at least one out-edge. To + * sample all vertices regardless of their edges, specify + * {@value EdgesEnum#NoEdges}. To require that each vertex has at least one + * in-edge and one out-edge, specify {@link EdgesEnum#AllEdges}. + */ + EdgesEnum getSampleEdgesFilter(); + + /** * Return the set of edges to which the GATHER is applied -or- * {@link EdgesEnum#NoEdges} to skip the GATHER phase. + * + * TODO We may need to set dynamically when visting the vertex in the + * frontier rather than having it be a one-time property of the vertex + * program. */ EdgesEnum getGatherEdges(); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASProgram.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -32,21 +32,46 @@ * the generic type for the per-edge state, but that is not always * true. The SUM type is scoped to the GATHER + SUM operation (NOT * the computation). - * + * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + * + * TODO DESIGN: The broad problem with this approach is that it is + * overly coupled with the Java object model. Instead it needs to expose + * an API that is aimed at vectored (for GPU) execution with 2D + * partitioning (for out-of-core, multi-node). */ public interface IGASProgram<VS, ES, ST> extends IGASOptions<VS, ES, ST> { /** + * One time initialization before the {@link IGASProgram} is executed. + * + * @param ctx + * The evaluation context. + */ + void before(IGASContext<VS, ES, ST> ctx); + + /** + * One time initialization after the {@link IGASProgram} is executed. + * + * @param ctx + * The evaluation context. + */ + void after(IGASContext<VS, ES, ST> ctx); + + /** * Callback to initialize the state for each vertex in the initial frontier * before the first iteration. A typical use case is to set the distance of * the starting vertex to ZERO (0). * * @param u * The vertex. + * + * TODO We do not need both the {@link IGASContext} and the + * {@link IGASState}. The latter is available from the former. */ - void init(IGASState<VS, ES, ST> state, Value u); - + void initVertex(IGASContext<VS, ES, ST> ctx, IGASState<VS, ES, ST> state, + Value u); + /** * GATHER is a map/reduce over the edges of the vertex. The SUM provides * pair-wise reduction over the edges visited by the GATHER. @@ -94,8 +119,14 @@ * TODO DESIGN: Rather than pair-wise reduction, why not use * vectored reduction? That way we could use an array of primitives * as well as objects. + * + * TODO DESIGN: This should be a reduced interface since we only + * need access to the comparator semantics while the [state] + * provides random access to vertex and edge state. The comparator + * is necessary for MIN semantics for the {@link Value} + * implementation of the backend. E.g., Value versus IV. */ - ST sum(ST left, ST right); + ST sum(final IGASState<VS, ES, ST> state, ST left, ST right); /** * Apply the reduced aggregation computed by GATHER + SUM to the vertex. @@ -155,7 +186,7 @@ * Return <code>true</code> iff the algorithm should continue. This is * invoked after every iteration, once the new frontier has been computed * and {@link IGASState#round()} has been advanced. An implementation may - * simple return <code>true</code>, in which case the algorithm will + * simply return <code>true</code>, in which case the algorithm will * continue IFF the current frontier is not empty. * <p> * Note: While this can be used to make custom decisions concerning the Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGASState.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -51,6 +51,8 @@ /** * {@link #reset()} the computation state and populate the initial frontier. * + * @param ctx + * The execution context. * @param v * One or more vertices that will be included in the initial * frontier. @@ -58,7 +60,7 @@ * @throws IllegalArgumentException * if no vertices are specified. */ - void init(Value... v); + void setFrontier(IGASContext<VS, ES, ST> ctx, Value... v); /** * Discard computation state (the frontier, vertex state, and edge state) @@ -227,5 +229,19 @@ * TODO RDR : Link to an RDR wiki page as well. */ Statement decodeStatement(Value v); + + /** + * Return -1, o, or 1 if <code>u</code> is LT, EQ, or GT <code>v</code>. A + * number of GAS programs depend on the ability to place an order over the + * vertex identifiers, as does 2D partitioning. The ordering provided by + * this method MAY be arbitrary, but it MUST be total and stable across the + * life-cycle of the GAS program evaluation. + * + * @param u + * A vertex. + * @param v + * Another vertex. + */ + int compareTo(Value u, Value v); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IGraphAccessor.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -31,18 +31,37 @@ public interface IGraphAccessor { /** - * Return the edges for the vertex. + * Return the #of edges of the specified type for the given vertex. + * <p> + * Note: This is not always a flyweight operation due to the need to filter + * for only the observable edge types. If this information is required, it + * may be best to cache it on the vertex state object for a given + * {@link IGASProgram}. * - * @param p + * @param ctx * The {@link IGASContext}. * @param u * The vertex. * @param edges * Typesafe enumeration indicating which edges should be visited. - * + * * @return An iterator that will visit the edges for that vertex. */ - Iterator<Statement> getEdges(IGASContext<?, ?, ?> p, Value u, + long getEdgeCount(IGASContext<?, ?, ?> ctx, Value u, EdgesEnum edges); + + /** + * Return the edges for the given vertex. + * + * @param ctx + * The {@link IGASContext}. + * @param u + * The vertex. + * @param edges + * Typesafe enumeration indicating which edges should be visited. + * + * @return An iterator that will visit the edges for that vertex. + */ + Iterator<Statement> getEdges(IGASContext<?, ?, ?> ctx, Value u, EdgesEnum edges); /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/IReducer.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -37,7 +37,7 @@ * The result from applying the procedure to a single index * partition. */ - public void visit(IGASState<VS, ES, ST> ctx, Value u); + public void visit(IGASState<VS, ES, ST> state, Value u); /** * Return the aggregated results as an implementation dependent object. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/BFS.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -15,16 +15,23 @@ */ package com.bigdata.rdf.graph.analytics; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.openrdf.model.Statement; import org.openrdf.model.Value; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASScheduler; import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; import com.bigdata.rdf.graph.impl.BaseGASProgram; import cutthecrap.utils.striterators.IStriterator; @@ -39,6 +46,8 @@ */ public class BFS extends BaseGASProgram<BFS.VS, BFS.ES, Void> { +// private static final Logger log = Logger.getLogger(BFS.class); + public static class VS { /** @@ -128,6 +137,13 @@ } @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.SingleVertex; + + } + + @Override public EdgesEnum getGatherEdges() { return EdgesEnum.NoEdges; @@ -158,7 +174,8 @@ * Not used. */ @Override - public void init(final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { + public void initVertex(final IGASContext<BFS.VS, BFS.ES, Void> ctx, + final IGASState<BFS.VS, BFS.ES, Void> state, final Value u) { state.getState(u).visit(0); @@ -169,15 +186,20 @@ */ @Override public Void gather(IGASState<BFS.VS, BFS.ES, Void> state, Value u, Statement e) { + throw new UnsupportedOperationException(); + } /** * Not used. */ @Override - public Void sum(Void left, Void right) { + public Void sum(final IGASState<BFS.VS, BFS.ES, Void> state, + final Void left, final Void right) { + throw new UnsupportedOperationException(); + } /** @@ -231,10 +253,124 @@ } @Override - public boolean nextRound(IGASContext<BFS.VS, BFS.ES, Void> ctx) { + public boolean nextRound(final IGASContext<BFS.VS, BFS.ES, Void> ctx) { return true; } + /** + * Reduce the active vertex stat, returning a histogram reporting the #of + * vertices at each distance from the starting vertex. There will always be + * one vertex at depth zero - this is the starting vertex. For each + * successive depth, the #of vertices that were labeled at that depth is + * reported. This is essentially the same as reporting the size of the + * frontier in each round of the traversal, but the histograph is reported + * based on the vertex state. + * + * @author <a href="mailto:tho...@us...">Bryan + * Thompson</a> + * + * TODO Do another reducer that reports the actual BFS tree rather + * than a histogram. For each depth, it needs to have the set of + * vertices that are at that number of hops from the starting + * vertex. So, there is an outer map from depth to set. The inner + * set should also be concurrent if we allow concurrent reduction of + * the activated vertex state. + */ + protected static class HistogramReducer implements + IReducer<VS, ES, Void, Map<Integer, AtomicLong>> { + + private final ConcurrentHashMap<Integer, AtomicLong> values = new ConcurrentHashMap<Integer, AtomicLong>(); + + @Override + public void visit(final IGASState<VS, ES, Void> state, final Value u) { + + final VS us = state.getState(u); + + if (us != null) { + + final Integer depth = Integer.valueOf(us.depth()); + + AtomicLong newval = values.get(depth); + + if (newval == null) { + + final AtomicLong oldval = values.putIfAbsent(depth, + newval = new AtomicLong()); + + if (oldval != null) { + + // lost data race. + newval = oldval; + + } + + } + + newval.incrementAndGet(); + + } + + } + + @Override + public Map<Integer, AtomicLong> get() { + + return Collections.unmodifiableMap(values); + + } + + } + + @Override + public void after(final IGASContext<BFS.VS, BFS.ES, Void> ctx) { + + final HistogramReducer r = new HistogramReducer(); + + ctx.getGASState().reduce(r); + + class NV implements Comparable<NV> { + public final int n; + public final long v; + public NV(final int n, final long v) { + this.n = n; + this.v = v; + } + @Override + public int compareTo(final NV o) { + if (o.n > this.n) + return -1; + if (o.n < this.n) + return 1; + return 0; + } + } + + final Map<Integer, AtomicLong> h = r.get(); + + final NV[] a = new NV[h.size()]; + + int i = 0; + + for (Map.Entry<Integer, AtomicLong> e : h.entrySet()) { + + a[i++] = new NV(e.getKey().intValue(), e.getValue().get()); + + } + + Arrays.sort(a); + + System.out.println("distance, frontierSize, sumFrontierSize"); + long sum = 0L; + for (NV t : a) { + + System.out.println(t.n + ", " + t.v + ", " + sum); + + sum += t.v; + + } + + } + } Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/CC.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -0,0 +1,411 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.analytics; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.EdgesEnum; +import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.impl.BaseGASProgram; + +import cutthecrap.utils.striterators.IStriterator; + +/** + * Connected components computes the distinct sets of non-overlapping subgraphs + * within a graph. All vertices within a connected component are connected along + * at least one path. + * <p> + * The implementation works by assigning a label to each vertex. The label is + * initially the vertex identifier for that vertex. The labels in the graph are + * then relaxed with each vertex taking the minimum of its one-hop neighhor's + * labels. The algorithm halts when no vertex label has changed state in a given + * iteration. + * + * <dl> + * <dt>init</dt> + * <dd>All vertices are inserted into the initial frontier.</dd> + * <dt>Gather</dt> + * <dd>Report the source vertex label (not its identifier)</dd> + * <dt>Apply</dt> + * <dd>label = min(label,gatherLabel)</dd> + * <dt>Scatter</dt> + * <dd>iff the label has changed</dd> + * </dl> + * + * FIXME CC : Implement. Should push the updates through the scatter function. + * Find an abstraction to support this pattern. It is used by both CC and SSSP. + * (We can initially implement this as a Gather (over all edges) plus a + * conditional Scatter (over all edges iff the vertex label has changed). We can + * then refactor both this class and SSSP to push the updates through a Scatter + * (what I think of as a Gather to a remote vertex).) + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class CC extends BaseGASProgram<CC.VS, CC.ES, Value> { + + private static final Logger log = Logger.getLogger(CC.class); + + public static class VS { + + /** + * The label for the vertex. This value is initially the vertex + * identifier. It is relaxed by the computation until it is the minimum + * vertex identifier for the connected component. + */ + private final AtomicReference<Value> label; + + /** + * <code>true</code> iff the label was modified. + */ + private boolean changed = false; + + public VS(final Value v) { + + this.label = new AtomicReference<Value>(v); + + } + + /** + * The assigned label for this vertex. Once converged, all vertices in a + * given connected component will have the same label and the labels + * assigned to the vertices in each connected component will be + * distinct. The labels themselves are just the identifier of a vertex + * in that connected component. Conceptually, either the MIN or the MAX + * over the vertex identifiers in the connected component can be used by + * the algorithm since both will provide a unique labeling strategy. + */ + public Value getLabel() { + + return label.get(); + + } + + private void setLabel(final Value v) { + + label.set(v); + + } + + @Override + public String toString() { + return "{label=" + label + ",changed=" + changed + "}"; + } + + }// class VS + + /** + * Edge state is not used. + */ + public static class ES { + + } + + private static final Factory<Value, CC.VS> vertexStateFactory = new Factory<Value, CC.VS>() { + + @Override + public CC.VS initialValue(final Value value) { + + return new VS(value); + + } + + }; + + @Override + public Factory<Value, CC.VS> getVertexStateFactory() { + + return vertexStateFactory; + + } + + @Override + public Factory<Statement, CC.ES> getEdgeStateFactory() { + + return null; + + } + + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.AllVertices; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to not impose any filter on the sampled vertices (it does not + * matter whether they have any connected edges since we need to put all + * vertices into the initial frontier). + */ + @Override + public EdgesEnum getSampleEdgesFilter() { + + return EdgesEnum.NoEdges; + + } + + @Override + public EdgesEnum getGatherEdges() { + + return EdgesEnum.AllEdges; + + } + + @Override + public EdgesEnum getScatterEdges() { + + return EdgesEnum.AllEdges; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to only visit the edges of the graph. + */ + @Override + public IStriterator constrainFilter( + final IGASContext<CC.VS, CC.ES, Value> ctx, final IStriterator itr) { + + return itr.addFilter(getEdgeOnlyFilter(ctx)); + + } + + /** + * {@inheritDoc} + * <p> + * Return the label of the remote vertex. + */ + @Override + public Value gather(final IGASState<CC.VS, CC.ES, Value> state, + final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + final CC.VS vs = state.getState(v); + + return vs.getLabel(); + + } + + /** + * MIN + * <p> + * {@inheritDoc} + */ + @Override + public Value sum(final IGASState<CC.VS, CC.ES, Value> state, + final Value left, final Value right) { + + // MIN(left,right) + if (state.compareTo(left, right) < 0) { + + return left; + + } + + return right; + + } + + /** + * {@inheritDoc} + * <p> + * Compute the new value for this vertex, making a note of the last change + * for this vertex. + */ + @Override + public CC.VS apply(final IGASState<CC.VS, CC.ES, Value> state, + final Value u, final Value sum) { + + final CC.VS us = state.getState(u); + + if (sum == null) { + + /* + * Nothing visited by Gather. No change. Vertex will be dropped from + * the frontier. + */ + + us.changed = false; + + return null; + + } + + final Value oldval = us.getLabel(); + + // MIN(oldval,gatherSum) + if (state.compareTo(oldval, sum) <= 0) { + + us.changed = false; + + if (log.isDebugEnabled()) + log.debug(" NO CHANGE: " + u + ", val=" + oldval); + + } else { + + us.setLabel(sum); + + us.changed = true; + + if (log.isDebugEnabled()) + log.debug("DID CHANGE: " + u + ", old=" + oldval + ", new=" + + sum); + + } + + return us; + + } + + /** + * {@inheritDoc} + * <p> + * Returns <code>true</code> iff the label was changed in the current round. + */ + @Override + public boolean isChanged(final IGASState<VS, ES, Value> state, + final Value u) { + + final CC.VS us = state.getState(u); + + return us.changed; + + } + + /** + * The remote vertex is scheduled for activation unless it has already been + * visited. + * <p> + * Note: We are scattering to out-edges. Therefore, this vertex is + * {@link Statement#getSubject()}. The remote vertex is + * {@link Statement#getObject()}. + */ + @Override + public void scatter(final IGASState<CC.VS, CC.ES, Value> state, + final IGASScheduler sch, final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + sch.schedule(v); + + } + + /** + * Returns a map containing the labels assigned to each connected component + * (which gives you a vertex in that connected component) and the #of + * vertices in each connected component. + */ + public Map<Value, AtomicInteger> getConnectedComponents( + final IGASState<CC.VS, CC.ES, Value> state) { + + final ConcurrentHashMap<Value, AtomicInteger> labels = new ConcurrentHashMap<Value, AtomicInteger>(); + + return state + .reduce(new IReducer<CC.VS, CC.ES, Value, Map<Value, AtomicInteger>>() { + + @Override + public void visit(final IGASState<VS, ES, Value> state, + final Value u) { + + final VS us = state.getState(u); + + if (us != null) { + + final Value label = us.getLabel(); + + if (log.isDebugEnabled()) + log.debug("v=" + u + ", label=" + label); + + final AtomicInteger oldval = labels.putIfAbsent( + label, new AtomicInteger(1)); + + if (oldval != null) { + + // lost race. increment existing counter. + oldval.incrementAndGet(); + + } + + } + + } + + @Override + public Map<Value, AtomicInteger> get() { + + return Collections.unmodifiableMap(labels); + + } + }); + + } + + @Override + public void after(final IGASContext<CC.VS, CC.ES, Value> ctx) { + + final Map<Value, AtomicInteger> labels = getConnectedComponents(ctx + .getGASState()); + + System.out.println("There are " + labels.size() + + " connected components"); + + class NV implements Comparable<NV> { + public final int n; + public final Value v; + public NV(int n, Value v) { + this.n = n; + this.v = v; + } + @Override + public int compareTo(final NV o) { + return o.n - this.n; + } + } + + final NV[] a = new NV[labels.size()]; + int i = 0; + for (Map.Entry<Value, AtomicInteger> e : labels.entrySet()) { + a[i++] = new NV(e.getValue().intValue(), e.getKey()); + } + + Arrays.sort(a); + + System.out.println("size, label"); + for(NV t : a) { + System.out.println(t.n + ", " + t.v); + } + + } + +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/PR.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -0,0 +1,428 @@ +/** + Copyright (C) SYSTAP, LLC 2006-2012. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.bigdata.rdf.graph.analytics; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.Value; + +import com.bigdata.rdf.graph.EdgesEnum; +import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; +import com.bigdata.rdf.graph.IGASContext; +import com.bigdata.rdf.graph.IGASScheduler; +import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.IReducer; +import com.bigdata.rdf.graph.impl.BaseGASProgram; + +import cutthecrap.utils.striterators.IStriterator; + +/** + * Page rank assigns weights to the vertices in a graph based by on the relative + * "importance" as determined by the patterns of directed links in the graph. + * The algorithm is given stated in terms of a computation that is related until + * the delta in the computed values for the vertices is within <i>epsilon</i> of + * ZERO. However, in practice convergence is based on <i>epsilon</i> is + * problematic due to the manner in which the results of the floating point + * operations depend on the sequence of those operations (which is why this + * implementation uses <code>double</code> precision). Thus, page rank is + * typically executed a specific number of iterations, e.g., 50 or 100. If + * convergence is based on epsilon, then it is possible that the computation + * will never converge, especially for smaller values of epsilon. + * <dl> + * <dt>init</dt> + * <dd>All vertices are inserted into the initial frontier.</dd> + * <dt>Gather</dt> + * <dd>sum( neighbor_value / neighbor_num_out_edges ) over the in-edges of the + * graph.</dd> + * <dt>Apply</dt> + * <dd>value = <i>resetProb</i> + (1.0 \xD0 <i>resetProb</i>) * gatherSum</dd> + * <dt>Scatter</dt> + * <dd>if (a) value has significantly changed <code>(fabs(old-new) GT + * <i>epsilon</i>)</code>; or (b) iterations LT limit</dd> + * </dl> + * <ul> + * <li>where <i>resetProb</i> is a value that determines a random reset + * probability and defaults to {@value PR#DEFAULT_RESET_PROB}.</li> + * <li> + * where <i>epsilon</i> controls the degree of convergence before the algorithm + * terminates and defaults to {@value PR#DEFAULT_EPSILON}.</li> + * </ul> + * + * FIXME PR UNIT TEST. Verify computed values (within variance) and max + * iterations. Ground truth from GL? + * + * FIXME PR: The out-edges can be taken directly from the vertex distribution. + * That will reduce the initialization overhead for PR. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class PR extends BaseGASProgram<PR.VS, PR.ES, Double> { + + private static final Logger log = Logger.getLogger(PR.class); + + // TOOD javadoc and config. + protected static final int DEFAULT_LIMIT = 100; + protected static final double DEFAULT_RESET_PROB = 0.15d; + protected static final double DEFAULT_EPSILON = 0.01d; + protected static final double DEFAULT_MIN_PAGE_RANK = 1d; + + private final double resetProb = DEFAULT_RESET_PROB; + private final double epsilon = DEFAULT_EPSILON; + private final int limit = DEFAULT_LIMIT; + private final double minPageRank = DEFAULT_MIN_PAGE_RANK; + + public static class VS { + + /** + * The current computed value for this vertex. + * <p> + * All vertices are initialized to the reset probability. They are then + * updated in each iteration to the new estimated value by apply(). + */ + private double value; + + /** + * The number of out-edges. This is computed once, when the vertex state + * is initialized. + */ + private long outEdges; + + /** + * The last delta observed for this vertex. + */ + private double lastChange = 0d; + + /** + * The current computed value for this vertex. + * <p> + * All vertices are initialized to the reset probability. They are then + * updated in each iteration to the new estimated value by apply(). + */ + public double getValue() { + + return value; + + } + + @Override + public String toString() { + return "{value=" + value + ",lastChange=" + lastChange + "}"; + } + + }// class VS + + /** + * Edge state is not used. + */ + public static class ES { + + } + + private static final Factory<Value, PR.VS> vertexStateFactory = new Factory<Value, PR.VS>() { + + @Override + public PR.VS initialValue(final Value value) { + + return new VS(); + + } + + }; + + @Override + public Factory<Value, PR.VS> getVertexStateFactory() { + + return vertexStateFactory; + + } + + @Override + public Factory<Statement, PR.ES> getEdgeStateFactory() { + + return null; + + } + + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.AllVertices; + + } + + @Override + public EdgesEnum getGatherEdges() { + + return EdgesEnum.InEdges; + + } + + @Override + public EdgesEnum getScatterEdges() { + + return EdgesEnum.OutEdges; + + } + + /** + * {@inheritDoc} + * <p> + * Overridden to only visit the edges of the graph. + */ + @Override + public IStriterator constrainFilter( + final IGASContext<PR.VS, PR.ES, Double> ctx, final IStriterator itr) { + + return itr.addFilter(getEdgeOnlyFilter(ctx)); + + } + + /** + * {@inheritDoc} + * <p> + * Each vertex is initialized to the reset probability. + * + * FIXME We need to do this efficiently. E.g., using a scan to find all of + * the vertices together with their in-degree or out-degree. That should be + * done to populate the frontier, initializing the #of out-edges at the same + * time. + */ + @Override + public void initVertex(final IGASContext<PR.VS, PR.ES, Double> ctx, + final IGASState<PR.VS, PR.ES, Double> state, final Value u) { + + final PR.VS us = state.getState(u); + + us.value = resetProb; + + us.outEdges = ctx.getGraphAccessor().getEdgeCount(ctx, u, + EdgesEnum.OutEdges); + } + + /** + * {@inheritDoc} + * <p> + */ + @Override + public Double gather(final IGASState<PR.VS, PR.ES, Double> state, + final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + final PR.VS vs = state.getState(v); + + /* + * Note: Division by zero should not be possible here since the edge + * that we used to discover [v] is an out-edge of [v]. + */ + + return (vs.value / vs.outEdges); + + } + + /** + * SUM + * <p> + * {@inheritDoc} + */ + @Override + public Double sum(final IGASState<PR.VS, PR.ES, Double> state, + final Double left, final Double right) { + + return left + right; + + } + + /** + * {@inheritDoc} + * <p> + * Compute the new value for this vertex, making a note of the last change + * for this vertex. + */ + @Override + public PR.VS apply(final IGASState<PR.VS, PR.ES, Double> state, + final Value u, final Double sum) { + + final PR.VS us = state.getState(u); + + if (sum == null) { + + /* + * No in-edges visited by Gather. No change. Vertex will be dropped + * from the frontier. + */ + + us.lastChange = 0d; + + return null; + + } + + final double newval = resetProb + (1.0 - resetProb) * sum; + + us.lastChange = (newval - us.value); + + us.value = newval; + + return us; + + } + + /** + * {@inheritDoc} + * <p> + * Returns <code>true</code> iff the last change was greater then epsilon. + */ + @Override + public boolean isChanged(final IGASState<VS, ES, Double> state, + final Value u) { + + final PR.VS us = state.getState(u); + + return us.lastChange > epsilon; + + } + + /** + * The remote vertex is scheduled for activation unless it has already been + * visited. + * <p> + * Note: We are scattering to out-edges. Therefore, this vertex is + * {@link Statement#getSubject()}. The remote vertex is + * {@link Statement#getObject()}. + */ + @Override + public void scatter(final IGASState<PR.VS, PR.ES, Double> state, + final IGASScheduler sch, final Value u, final Statement e) { + + final Value v = state.getOtherVertex(u, e); + + sch.schedule(v); + + } + + /** + * {@inheritDoc} + * <p> + * Continue unless the iteration limit has been reached. + */ + @Override + public boolean nextRound(final IGASContext<PR.VS, PR.ES, Double> ctx) { + + return ctx.getGASState().round() < limit; + + } + + @Override + public void after(final IGASContext<PR.VS, PR.ES, Double> ctx) { + + final ConcurrentHashMap<Value, Double> values = new ConcurrentHashMap<Value, Double>(); + + ctx.getGASState().reduce( + new IReducer<PR.VS, PR.ES, Double, Map<Value, Double>>() { + + @Override + public void visit(final IGASState<VS, ES, Double> state, + final Value u) { + + final VS us = state.getState(u); + + if (us != null) { + + final double pageRank = us.getValue(); + + // FIXME Why are NaNs showing up? + if (Double.isNaN(pageRank)) + return; + + // FIXME Do infinite values show up? + if (Double.isInfinite(pageRank)) + return; + + if (pageRank < minPageRank) { + // Ignore small values. + return; + } + + /* + * Only report the larger ranked values. + */ + + if (log.isDebugEnabled()) + log.debug("v=" + u + ", pageRank=" + pageRank); + + values.put(u, Double.valueOf(pageRank)); + + } + + } + + @Override + public Map<Value, Double> get() { + + return Collections.unmodifiableMap(values); + + } + }); + + class NV implements Comparable<NV> { + public final double n; + public final Value v; + public NV(double n, Value v) { + this.n = n; + this.v = v; + } + @Override + public int compareTo(final NV o) { + if (o.n > this.n) + return 1; + if (o.n < this.n) + return -1; + return 0; + } + } + + final NV[] a = new NV[values.size()]; + + int i = 0; + + for (Map.Entry<Value, Double> e : values.entrySet()) { + + a[i++] = new NV(e.getValue().doubleValue(), e.getKey()); + + } + + Arrays.sort(a); + + System.out.println("rank, pageRank, vertex"); + i = 0; + for (NV t : a) { + + System.out.println(i + ", " + t.n + ", " + t.v); + + i++; + + } + + } + +} Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/analytics/SSSP.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -20,6 +20,7 @@ import org.openrdf.model.Value; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASScheduler; import com.bigdata.rdf.graph.IGASState; @@ -29,22 +30,24 @@ /** * SSSP (Single Source, Shortest Path). This analytic computes the shortest path - * to each vertex in the graph starting from the given vertex. Only connected - * vertices are visited by this implementation (the frontier never leaves the - * connected component in which the starting vertex is located). + * to each connected vertex in the graph starting from the given vertex. Only + * connected vertices are visited by this implementation (the frontier never + * leaves the connected component in which the starting vertex is located). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * - * TODO There is no reason to do a gather on the first round. Add - * isGather()? (parallel to isChanged() for scatter?) - * - * TODO Add reducer pattern for finding the maximum degree vertex. - * * TODO Add parameter for directed versus undirected SSSP. When * undirected, the gather and scatter are for AllEdges. Otherwise, * gather on in-edges and scatter on out-edges. Also, we need to use a * getOtherVertex(e) method to figure out the other edge when using * undirected scatter/gather. Add unit test for undirected. + * + * FIXME New SSSP (push style scatter abstraction with new test case + * based on graph example developed for this) + * + * TODO Add a reducer to report the actual minimum length paths. This is + * similar to a BFS tree, but the path lengths are not integer values so + * we need a different data structure to collect them. */ public class SSSP extends BaseGASProgram<SSSP.VS, SSSP.ES, Integer/* dist */> { @@ -76,8 +79,12 @@ * The minimum observed distance (in hops) from the source to this * vertex and initially {@link Integer#MAX_VALUE}. When this value is * modified, the {@link #changed} flag is set as a side-effect. + * + * FIXME This really needs to be a floating point value, probably + * double. We also need tests with non-integer weights and non- positive + * weights. */ - private int dist = Integer.MAX_VALUE; + private Integer dist = Integer.MAX_VALUE; private boolean changed = false; @@ -147,6 +154,13 @@ } + @Override + public FrontierEnum getInitialFrontierEnum() { + + return FrontierEnum.SingleVertex; + + } + // @Override // public Factory<ISPO, SSSP.ES> getEdgeStateFactory() { // @@ -188,8 +202,8 @@ * {@inheritDoc} */ @Override - public void init(final IGASState<SSSP.VS, SSSP.ES, Integer> state, - final Value u) { + public void initVertex(final IGASContext<SSSP.VS, SSSP.ES, Integer> ctx, + final IGASState<SSSP.VS, SSSP.ES, Integer> state, final Value u) { final VS us = state.getState(u); @@ -235,7 +249,8 @@ * MIN */ @Override - public Integer sum(final Integer left, final Integer right) { + public Integer sum(final IGASState<SSSP.VS, SSSP.ES, Integer> state, + final Integer left, final Integer right) { return Math.min(left, right); Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/BaseGASProgram.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -15,15 +15,22 @@ */ package com.bigdata.rdf.graph.impl; +import java.util.Arrays; +import java.util.Random; + +import org.apache.log4j.Logger; +import org.openrdf.model.Resource; import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; import com.bigdata.rdf.graph.EdgesEnum; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.FrontierEnum; import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGASState; +import com.bigdata.rdf.graph.impl.util.VertexDistribution; import cutthecrap.utils.striterators.Filter; import cutthecrap.utils.striterators.IFilter; @@ -40,6 +47,8 @@ abstract public class BaseGASProgram<VS, ES, ST> implements IGASProgram<VS, ES, ST> { + private static final Logger log = Logger.getLogger(BaseGASProgram.class); + /** * {@inheritDoc} * <p> @@ -151,6 +160,30 @@ /** * {@inheritDoc} * <p> + * The default implementation returns {@link #getGatherEdges()} and the + * {@link #getScatterEdges()} if {@link #getGatherEdges()} returns + * {@value EdgesEnum#NoEdges}. + */ + @Override + public EdgesEnum getSampleEdgesFilter() { + + // Assume that a GATHER will be done for each starting vertex. + EdgesEnum edges = getGatherEdges(); + + if (edges == EdgesEnum.NoEdges) { + + // If no GATHER is performed, then use the SCATTER edges. + edges = getScatterEdges(); + + } + + return edges; + + } + + /** + * {@inheritDoc} + * <p> * The default gathers on the {@link EdgesEnum#InEdges}. */ @Override @@ -175,10 +208,72 @@ /** * {@inheritDoc} * <p> + * The default implementation populates the frontier IFF this is an + * {@link FrontierEnum#AllVertices} {@link IGASProgram}. + */ + @Override + public void before(final IGASContext<VS, ES, ST> ctx) { + + switch (getInitialFrontierEnum()) { + case AllVertices: { + addAllVerticesToFrontier(ctx); + break; + } + } + + } + + /** + * {@inheritDoc} + * <p> + * The default implementation is a NOP. + */ + @Override + public void after(final IGASContext<VS, ES, ST> ctx) { + + // NOP + + } + + /** + * Populate the initial frontier using all vertices in the graph. + * + * @param ctx + * The graph evaluation context. + * + * TODO This has a random number generator whose initial seed is + * not controlled by the caller. However, the desired use case + * here is to produce a distribution over ALL vertices so the + * random number should be ignored - perhaps we should pass it in + * as <code>null</code>? + */ + private void addAllVerticesToFrontier(final IGASContext<VS, ES, ST> ctx) { + + final IGASState<VS, ES, ST> gasState = ctx.getGASState(); + + final EdgesEnum sampleEdges = getSampleEdgesFilter(); + + final VertexDistribution dist = ctx.getGraphAccessor().getDistribution( + new Random()); + + final Resource[] initialFrontier = dist.getUnweightedSample( + dist.size(), sampleEdges); + + if (log.isDebugEnabled()) + log.debug("initialFrontier=" + Arrays.toString(initialFrontier)); + + gasState.setFrontier(ctx, initialFrontier); + + } + + /** + * {@inheritDoc} + * <p> * The default is a NOP. */ @Override - public void init(final IGASState<VS, ES, ST> state, final Value u) { + public void initVertex(final IGASContext<VS, ES, ST> ctx, + final IGASState<VS, ES, ST> state, final Value u) { // NOP Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASContext.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -104,10 +104,17 @@ } @Override + public IGraphAccessor getGraphAccessor() { + return graphAccessor; + } + + @Override public IGASStats call() throws Exception { final GASStats total = new GASStats(); + program.before(this); + while (!gasState.frontier().isEmpty()) { final GASStats roundStats = new GASStats(); @@ -123,6 +130,8 @@ gasState.traceState(); + program.after(this); + // Done return total; @@ -160,7 +169,7 @@ * SCATTER depending on some characteristics of the algorithm. Is this * worth while? * - * TODO The ability to pushd down the APPLY for AllEdges for the GATHER + * Note: The ability to push down the APPLY for AllEdges for the GATHER * depends on our using the union of the in-edges and out-edges * iterators to visit those edges. That union means that we do not have * to preserve the accumulant across the in-edges and out-edges aspects @@ -177,23 +186,18 @@ final boolean pushDownApplyInScatter; final boolean runApplyStage; - if (scatterEdges == EdgesEnum.NoEdges) { + if (gatherEdges != EdgesEnum.NoEdges) { // Do APPLY() in GATHER. pushDownApplyInGather = true; pushDownApplyInScatter = false; runApplyStage = false; - } else if (gatherEdges == EdgesEnum.NoEdges) { + } else if (scatterEdges != EdgesEnum.NoEdges) { // APPLY() in SCATTER. pushDownApplyInGather = false; pushDownApplyInScatter = true; runApplyStage = false; } else { - /* - * Do not push down the APPLY. - * - * TODO We could still push down the apply into the GATHER if we are - * doing both stages. - */ + // Do not push down the APPLY. pushDownApplyInGather = false; pushDownApplyInScatter = false; runApplyStage = true; @@ -620,7 +624,7 @@ } else { - left = program.sum(left, right); + left = program.sum(gasState, left, right); } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-10-18 15:34:43 UTC (rev 7460) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-gas/src/java/com/bigdata/rdf/graph/impl/GASState.java 2013-10-19 20:01:51 UTC (rev 7461) @@ -15,6 +15,7 @@ */ package com.bigdata.rdf.graph.impl; +import java.util.Comparator; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -26,8 +27,10 @@ import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; import com.bigdata.rdf.graph.Factory; +import com.bigdata.rdf.graph.IGASContext; import com.bigdata.rdf.graph.IGASEngine; import com.bigdata.rdf.graph.IGASProgram; import com.bigdata.rdf.graph.IGASSchedulerImpl; @@ -108,8 +111,13 @@ * Provides access to the backing graph. Used to decode vertices and edges * for {@link #traceState()}. */ - private IGraphAccessor graphAccessor; + private final IGraphAccessor graphAccessor; + /** + * Used to establish a total ordering over RDF {@link Value}s. + */ + private final Comparator<Value> valueComparator; + public GASState(final IGASEngine gasEngine,// final IGraphAccessor graphAccessor, // final IStaticFrontier frontier,// @@ -146,6 +154,13 @@ this.scheduler = gasScheduler; + /* + * TODO This is the SPARQL value ordering. It might be not be total or + * stable. If not, we can use an ordering over the string values of the + * RDF Values, but that will push the heap. + */ + this.valueComparator = new ValueComparator(); + } /** @@ -242,7 +257,7 @@ } @Override - public void init(final Value... vertices) { + public void setFrontier(final IGASContext<VS, ES, ST> ctx, final Value... vertices) { if (vertices == null) throw new IllegalArgumentException(); @@ -263,7 +278,7 @@ */ for (Value v : tmp) { - gasProgram.init(this, v); + gasProgram.initVertex(ctx, this, v); } @@ -272,7 +287,7 @@ && sortFrontier, tmp.iterator()); } - + @Override public void traceState() { @@ -392,4 +407,15 @@ return null; } + @Override + public int compareTo(final Value u, final Valu... [truncated message content] |
From: <tho...@us...> - 2013-10-18 15:34:52
|
Revision: 7460 http://bigdata.svn.sourceforge.net/bigdata/?rev=7460&view=rev Author: thompsonbry Date: 2013-10-18 15:34:43 +0000 (Fri, 18 Oct 2013) Log Message: ----------- Fix for #755 (Concurrent QuorumActors can interfere leading to failure to progress). Changes include: - a single thread for all actor actions. - a reliable interruptAll() implementation that will cancel all queued and any running actor action. - a more defensive version of conditionalWithdrawVote() which retries from the top if getCastVote() value changes concurrently. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-16 16:26:49 UTC (rev 7459) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-18 15:34:43 UTC (rev 7460) @@ -27,6 +27,7 @@ import java.rmi.Remote; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -34,11 +35,14 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; @@ -50,6 +54,7 @@ import org.apache.log4j.Logger; +import com.bigdata.concurrent.FutureTaskMon; import com.bigdata.ha.HAGlue; import com.bigdata.ha.HAPipelineGlue; import com.bigdata.ha.QuorumService; @@ -335,8 +340,25 @@ // private final long watcherShutdownTimeout = Long.MAX_VALUE; // ms private final long watcherShutdownTimeout = 3000; // ms - /** + * A service used by {@link QuorumActorBase} to execute actions in a single + * thread. + */ + private ThreadPoolExecutor actorActionService; + + /** + * This may be used to force a behavior where the actor is taken in the + * caller's thread. This is not compatible with the use of timeouts for the + * actions. + */ + private final boolean singleThreadActor = true; + private final long actorShutdownTimeout = watcherShutdownTimeout; // ms + /** + * Collector of queued or running futures for actor action tasks. + */ + private final Set<FutureTask<Void>> knownActorTasks = new HashSet<FutureTask<Void>>(); + + /** * A single threaded service used to pump events to clients outside of the * thread in which those events arise. */ @@ -467,6 +489,16 @@ .newCachedThreadPool(new DaemonThreadFactory( "WatcherActionService")); } + if (singleThreadActor) { + // run on a single-thread executor service. + this.actorActionService = new ThreadPoolExecutor(1, 1, 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new DaemonThreadFactory("ActorActionService")); + } else { + // run in the caller's thread. + this.actorActionService = null; + } // Reach out to the durable quorum and get the lastValidToken this.lastValidToken = getLastValidTokenFromQuorumState(client); // Setup the watcher. @@ -510,9 +542,34 @@ * Ensure that any guarded regions are interrupted. */ public void interruptAll() { - - threadGuard.interruptAll(); - + + // Cancel any queued or running action task. + synchronized (knownActorTasks) { + + for (Future<Void> ft : knownActorTasks) { + + ft.cancel(true/* mayInterruptIfRunning */); + + } + /* + * Note: we do not need to clear this collection. Tasks will be + * removed from the collection when they are propagated through the + * actorActionService. Cancelled tasks will immediately drop through + * the service and have their futures removed from this collection. + * This allows us to assert that the future is properly removed from + * the collection in runActorTask(). + */ +// knownActorTasks.clear(); + + /* + * Interrupt any running actions. + * + * TODO This is redundant with the cancel() of the knownActorTasks. + */ + threadGuard.interruptAll(); + + } + } @Override @@ -613,6 +670,26 @@ watcherActionService = null; } } + if (actorActionService != null) { + actorActionService.shutdown(); + try { + if (!actorActionService.awaitTermination( + actorShutdownTimeout, TimeUnit.MILLISECONDS)) { + log.error("ActorActionService termination timeout"); + } + } catch (com.bigdata.concurrent.TimeoutException ex) { + // Ignore. + } catch (InterruptedException ex) { + // Will be propagated below. + interrupted = true; + } finally { + /* + * Cancel any tasks which did terminate in a timely manner. + */ + actorActionService.shutdownNow(); + actorActionService = null; + } + } if (!sendSynchronous) { eventService.shutdown(); try { @@ -1417,18 +1494,133 @@ } + @Override final public QuorumMember<S> getQuorumMember() { return (QuorumMember<S>) client; } + @Override final public Quorum<S, C> getQuourm() { return AbstractQuorum.this; } + @Override final public UUID getServiceId() { return serviceId; } + /** + * Task used to run an action. + */ + abstract protected class ActorTask implements Callable<Void> { + + @Override + public Void call() throws Exception { + + lock.lockInterruptibly(); + try { + + doAction(); + + return null/* Void */; + + } finally { + + lock.unlock(); + + } + + } + + /** + * Execute the action - the lock is already held by the thread. + */ + abstract protected void doAction() throws InterruptedException; + + } + + private Executor getActorExecutor() { + return actorActionService; + } + + private void runActorTask(final ActorTask task) { + if (!singleThreadActor) { + /* + * Run in the caller's thread. + */ + try { + task.call(); + return; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /* + * Run on the single-threaded executor. + */ + final FutureTask<Void> ft = new FutureTaskMon<Void>(task); + synchronized(knownActorTasks) { + if (!knownActorTasks.add(ft)) + throw new AssertionError(); + } + getActorExecutor().execute(ft); + try { + ft.get(); + } catch (InterruptedException e) { + // Propagate interrupt + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } finally { + ft.cancel(true/* mayInterruptIfRunning */); + synchronized (knownActorTasks) { + if (!knownActorTasks.remove(ft)) + throw new AssertionError(); + } + } + } + + /** + * Variant method supporting a timeout. + * + * @param task + * @param timeout + * @param unit + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + * + * TODO Add variants of memberAdd() and friends that accept + * a timeout. They should use this method rather than + * {@link #runActorTask(ActorTask)}. + */ + private void runActorTask(final ActorTask task, final long timeout, + final TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + if (!singleThreadActor) { + /* + * Timeout support requires an executor service to run the actor + * tasks. + */ + throw new UnsupportedOperationException(); + } + final FutureTask<Void> ft = new FutureTaskMon<Void>(task); + synchronized (knownActorTasks) { + if (!knownActorTasks.add(ft)) + throw new AssertionError(); + } + getActorExecutor().execute(ft); + try { + ft.get(timeout, unit); + } finally { + ft.cancel(true/* mayInterruptIfRunning */); + synchronized (knownActorTasks) { + if (!knownActorTasks.remove(ft)) + throw new AssertionError(); + } + } + } + /* * Public API methods. * @@ -1447,24 +1639,34 @@ * mutual recursion among the public API methods. */ + @Override final public void memberAdd() { - lock.lock(); - try { + runActorTask(new MemberAddTask()); + } + + final private class MemberAddTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { conditionalMemberAddImpl(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } + + @Override + final public void castVote(final long lastCommitTime) { + runActorTask(new CastVoteTask(lastCommitTime)); + } + + private final class CastVoteTask extends ActorTask { + private final long lastCommitTime; - final public void castVote(final long lastCommitTime) { - if (lastCommitTime < 0) - throw new IllegalArgumentException(); - lock.lock(); - try { + public CastVoteTask(final long lastCommitTime) { + if (lastCommitTime < 0) + throw new IllegalArgumentException(); + this.lastCommitTime = lastCommitTime; + } + + @Override + protected void doAction() throws InterruptedException { if (!members.contains(serviceId)) throw new QuorumException(ERR_NOT_MEMBER + serviceId); /* @@ -1476,47 +1678,39 @@ // if (!pipeline.contains(serviceId)) // throw new QuorumException(ERR_NOT_PIPELINE + serviceId); conditionalCastVoteImpl(lastCommitTime); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } + @Override final public void pipelineAdd() { - lock.lock(); - try { + runActorTask(new PipelineAddTask()); + } + + private final class PipelineAddTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { if (!members.contains(serviceId)) throw new QuorumException(ERR_NOT_MEMBER + serviceId); conditionalPipelineAddImpl(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } + @Override final public void serviceJoin() { - lock.lock(); - try { + runActorTask(new ServiceJoinTask()); + } + + private final class ServiceJoinTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { if (!members.contains(serviceId)) throw new QuorumException(ERR_NOT_MEMBER + serviceId); if (!pipeline.contains(serviceId)) throw new QuorumException(ERR_NOT_PIPELINE + serviceId); conditionalServiceJoin(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } - + // final public void setLastValidToken(final long newToken) { // lock.lock(); // try { @@ -1553,9 +1747,21 @@ // } // } + @Override final public void setToken(final long newToken) { - lock.lock(); - try { + runActorTask(new SetTokenTask(newToken)); + } + + private final class SetTokenTask extends ActorTask { + + private final long newToken; + + public SetTokenTask(final long newToken) { + this.newToken = newToken; + } + + @Override + protected void doAction() throws InterruptedException { if (!isQuorum(joined.size())) throw new QuorumException(ERR_CAN_NOT_MEET + " too few services are joined: #joined=" @@ -1567,25 +1773,18 @@ throw new QuorumException(ERR_QUORUM_MET); conditionalSetToken(newToken); log.warn("Quorum meet."); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } + @Override final public void clearToken() { - lock.lock(); - try { + runActorTask(new ClearTokenTask()); + } + + private final class ClearTokenTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { conditionalClearToken(); - } catch (InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } @@ -1593,76 +1792,77 @@ // Public API "remove" methods. // + @Override final public void memberRemove() { - lock.lock(); - try { - memberRemoveInterruptable(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); - } + runActorTask(new MemberRemoveTask()); } - - /** - * An interruptable version of {@link #memberRemove()} - */ - protected void memberRemoveInterruptable() throws InterruptedException { - lock.lockInterruptibly(); - try { + + private final class MemberRemoveTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { conditionalServiceLeaveImpl(); conditionalPipelineRemoveImpl(); conditionalWithdrawVoteImpl(); conditionalMemberRemoveImpl(); - } finally { - lock.unlock(); } } + /** + * An interruptable version of {@link #memberRemove()}. + * <p> + * Note: This is used by {@link AbstractQuorum#terminate()}. That code + * is already holding the lock in the caller's thread. Therefore it + * needs to run these operations in the same thread to avoid a deadlock + * with itself. + */ + protected void memberRemoveInterruptable() throws InterruptedException { + if (!lock.isHeldByCurrentThread()) + throw new IllegalMonitorStateException(); + conditionalServiceLeaveImpl(); + conditionalPipelineRemoveImpl(); + conditionalWithdrawVoteImpl(); + conditionalMemberRemoveImpl(); + } + + @Override final public void withdrawVote() { - lock.lock(); - try { + runActorTask(new WithdrawVoteTask()); + } + + private final class WithdrawVoteTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { conditionalServiceLeaveImpl(); conditionalPipelineRemoveImpl(); conditionalWithdrawVoteImpl(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } - + + @Override final public void pipelineRemove() { - lock.lock(); - try { + runActorTask(new PipelineRemoveTask()); + } + + private final class PipelineRemoveTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { conditionalWithdrawVoteImpl(); conditionalServiceLeaveImpl(); conditionalPipelineRemoveImpl(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } + @Override final public void serviceLeave() { - lock.lock(); - try { + runActorTask(new ServiceLeaveTask()); + } + + private final class ServiceLeaveTask extends ActorTask { + @Override + protected void doAction() throws InterruptedException { conditionalWithdrawVoteImpl(); conditionalPipelineRemoveImpl(); conditionalServiceLeaveImpl(); - } catch(InterruptedException e) { - // propagate the interrupt. - Thread.currentThread().interrupt(); - return; - } finally { - lock.unlock(); } } @@ -1741,23 +1941,36 @@ } private void conditionalWithdrawVoteImpl() throws InterruptedException { - if (log.isDebugEnabled()) log.debug(new StackInfoReport()); - - final Long lastCommitTime = getCastVote(serviceId); - if (lastCommitTime != null) { + while (true) { + // check for a cast vote. + final Long lastCommitTime = getCastVote(serviceId); + if (lastCommitTime == null) + break; + // cast vote exists. tell actor to withdraw vote. + if (log.isDebugEnabled()) + log.debug("will withdraw vote: serviceId=" + serviceId + + ",lastCommitTime=" + lastCommitTime); doWithdrawVote(); guard(new Guard() { + // wait until the cast vote is withdrawn. public void run() throws InterruptedException { - while (getCastVote(serviceId) != null) { + Long tmp; + while ((tmp = getCastVote(serviceId)) != null) { + if (!tmp.equals(lastCommitTime)) { + log.warn("Concurrent vote change: old=" + + lastCommitTime + ", new=" + tmp); + return; + } votesChange.await(); } + // a cast vote has been cleared. + if (log.isDebugEnabled()) + log.debug("withdrew vote: serviceId=" + serviceId + + ",lastCommitTime=" + lastCommitTime); } }); - if (log.isDebugEnabled()) - log.debug("withdrew vote: serviceId=" + serviceId - + ",lastCommitTime=" + lastCommitTime); } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-16 16:26:56
|
Revision: 7459 http://bigdata.svn.sourceforge.net/bigdata/?rev=7459&view=rev Author: thompsonbry Date: 2013-10-16 16:26:49 +0000 (Wed, 16 Oct 2013) Log Message: ----------- Bug fix for #754 (Failure to setup SERVICE hook and changeLog for Unisolated and Read/Write connections) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2013-10-16 16:10:19 UTC (rev 7458) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-sails/src/java/com/bigdata/rdf/sail/BigdataSail.java 2013-10-16 16:26:49 UTC (rev 7459) @@ -1223,51 +1223,8 @@ } - if (txLog.isInfoEnabled()) - txLog.info("SAIL-START-CONN : conn=" + this); + // Note: post-init moved to startConn(). - if (!conn.isReadOnly()) { - - /* - * Give each registered ServiceFactory instance an opportunity - * to intercept the start of this connection. - */ - - final Iterator<CustomServiceFactory> itr = ServiceRegistry - .getInstance().customServices(); - - while (itr.hasNext()) { - - final CustomServiceFactory f = itr.next(); - - try { - - f.startConnection(conn); - - } catch (Throwable t) { - - log.error(t, t); - - continue; - - } - - } - - if (conn.changeLog != null) { - - /* - * Note: The read/write tx will also do this after each - * commit since it always starts a newTx() when the last one - * is done. - */ - - conn.changeLog.transactionBegin(); - - } - - } - return conn; } catch (Exception ex) { @@ -1359,7 +1316,7 @@ // new writable connection. final BigdataSailConnection conn = new BigdataSailConnection(database, - writeLock, true/* unisolated */); + writeLock, true/* unisolated */).startConn(); return conn; @@ -1421,7 +1378,7 @@ private BigdataSailConnection _getReadOnlyConnection(final long timestamp) throws IOException { - return new BigdataSailReadOnlyConnection(timestamp); + return new BigdataSailReadOnlyConnection(timestamp).startConn(); } @@ -1435,28 +1392,28 @@ public BigdataSailConnection getReadWriteConnection() throws IOException { if (!isolatable) { - + throw new UnsupportedOperationException( "Read/write transactions are not allowed on this database. " + "See " + Options.ISOLATABLE_INDICES); - + } - + final IIndexManager indexManager = database.getIndexManager(); - if(indexManager instanceof IBigdataFederation<?>) { + if (indexManager instanceof IBigdataFederation<?>) { throw new UnsupportedOperationException("Read/write transactions are not yet supported in scale-out."); - + } - + final Lock readLock = lock.readLock(); readLock.lock(); - return new BigdataSailRWTxConnection(readLock); - + return new BigdataSailRWTxConnection(readLock).startConn(); + } - + /** * Return the {@link ITransactionService}. */ @@ -1780,6 +1737,79 @@ this(lock, unisolated, database.isReadOnly()); attach(database); + + } + + /** + * Per-connection initialization. This method must be invoked after the + * constructor so that the connection is otherwise initialized. + * <p> + * This method currently will expose a mutable connection to any + * registered {@link CustomServiceFactory}. + * + * @param conn + * The connection. + * + * @return The connection. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/754"> + * Failure to setup SERVICE hook and changeLog for Unisolated and + * Read/Write connections </a> + */ + protected BigdataSailConnection startConn() { + + if (txLog.isInfoEnabled()) + txLog.info("SAIL-START-CONN : conn=" + this); + + if (!isReadOnly()) { + + /* + * Give each registered ServiceFactory instance an opportunity to + * intercept the start of this connection. + */ + + final Iterator<CustomServiceFactory> itr = ServiceRegistry + .getInstance().customServices(); + + while (itr.hasNext()) { + + final CustomServiceFactory f = itr.next(); + + try { + + f.startConnection(this); + + } catch (Throwable t) { + + log.error(t, t); + + continue; + + } + + } + + if (this.changeLog != null) { + + /* + * Note: The read/write tx will also do this after each + * commit since it always starts a newTx() when the last one + * is done. + * + * TODO Is it possible to observe an event here? The + * [changeLog] will be [null] when the BigdataSailConnection + * constructor is done. startConn() is then invoked + * immediately on the connection. Therefore, [changeLog] + * SHOULD always be [null] here. + */ + + this.changeLog.transactionBegin(); + + } + + } + + return this; } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-16 16:10:25
|
Revision: 7458 http://bigdata.svn.sourceforge.net/bigdata/?rev=7458&view=rev Author: thompsonbry Date: 2013-10-16 16:10:19 +0000 (Wed, 16 Oct 2013) Log Message: ----------- Bug fix to ZKQuorumImpl where doPipelineRemove() and doServiceLeave() could fail if there was a concurrent remove of a znode in the pipeline or service list. The doMemberRemove() code did not hit this since it knows the exact zpath. The doWithdrawVote() code was already protected for this kind of concurrent modification issue. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-15 12:46:47 UTC (rev 7457) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/quorum/zk/ZKQuorumImpl.java 2013-10-16 16:10:19 UTC (rev 7458) @@ -440,9 +440,17 @@ Arrays.sort(children); { for (String s : children) { + // Get the data for that znode. + final byte[] b; + try { + b = zk.getData(zpath + "/" + s, false/* watch */, + null/* stat */); + } catch (NoNodeException ex) { + // Concurrent remove of some znode. + continue; + } final QuorumServiceState state = (QuorumServiceState) SerializerUtil - .deserialize(zk.getData(zpath + "/" + s, - false/* watch */, null/* stat */)); + .deserialize(b); if (serviceId.equals(state.serviceUUID())) { zk.delete(zpath + "/" + s, -1/* anyVersion */); return; @@ -858,9 +866,16 @@ { for (String s : children) { // Examine the data for that child. + final byte[] b; + try { + b = zk.getData(zpath + "/" + s, false/* watch */, + null/* stat */); + } catch (NoNodeException ex) { + // Concurrent remove of some znode. + continue; + } final QuorumServiceState state = (QuorumServiceState) SerializerUtil - .deserialize(zk.getData(zpath + "/" + s, - false/* watch */, null/* stat */)); + .deserialize(b); if (serviceId.equals(state.serviceUUID())) { // Found this service. zk.delete(zpath + "/" + s, -1/* anyVersion */); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-15 12:46:54
|
Revision: 7457 http://bigdata.svn.sourceforge.net/bigdata/?rev=7457&view=rev Author: thompsonbry Date: 2013-10-15 12:46:47 +0000 (Tue, 15 Oct 2013) Log Message: ----------- Added @Override tags Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java 2013-10-15 12:41:47 UTC (rev 7456) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/join/JVMDistinctFilter.java 2013-10-15 12:46:47 UTC (rev 7457) @@ -64,10 +64,12 @@ this.hash = java.util.Arrays.hashCode(vals); } + @Override public int hashCode() { return hash; } + @Override public boolean equals(final Object o) { if (this == o) return true; This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-15 12:41:54
|
Revision: 7456 http://bigdata.svn.sourceforge.net/bigdata/?rev=7456&view=rev Author: thompsonbry Date: 2013-10-15 12:41:47 +0000 (Tue, 15 Oct 2013) Log Message: ----------- Adding the bind heisenbug test into CI. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAll.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAll.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAll.java 2013-10-11 23:08:17 UTC (rev 7455) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestAll.java 2013-10-15 12:41:47 UTC (rev 7456) @@ -127,6 +127,7 @@ // Test suite for SPARQL 1.1 BINDINGS clause suite.addTestSuite(TestBindings.class); + suite.addTestSuite(TestBindHeisenbug708.class); // Complex queries. suite.addTestSuite(TestComplexQuery.class); This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-10-11 23:08:23
|
Revision: 7455 http://bigdata.svn.sourceforge.net/bigdata/?rev=7455&view=rev Author: jeremy_carroll Date: 2013-10-11 23:08:17 +0000 (Fri, 11 Oct 2013) Log Message: ----------- added comments concerning trac750 which I am no-fixing Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StaticAnalysis.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StaticAnalysis.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StaticAnalysis.java 2013-10-11 23:08:01 UTC (rev 7454) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/sparql/ast/StaticAnalysis.java 2013-10-11 23:08:17 UTC (rev 7455) @@ -1321,11 +1321,19 @@ * definitely bound (in a subquery they are definitely bound if they are * projected into the subquery). * + * TODO In the case when the variable is bound to an expression + * and the expression may execute with an error, this + * method incorrectly reports that variable as definitely bound + * see trac 750 + * * @see https://sourceforge.net/apps/trac/bigdata/ticket/412 * (StaticAnalysis#getDefinitelyBound() ignores exogenous variables.) * * @see http://sourceforge.net/apps/trac/bigdata/ticket/430 (StaticAnalysis * does not follow renames of projected variables) + * + * @see http://sourceforge.net/apps/trac/bigdata/ticket/750 + * artificial test case fails, currently wontfix */ // MUST : QueryBase public Set<IVariable<?>> getDefinitelyProducedBindings(final QueryBase queryBase) { @@ -1368,6 +1376,11 @@ * * 4. Projection of a select expression which is not an aggregate. * + * This case is the one explored in trac750, and the code + * below while usually correct is incorrect if the expression + * can evaluate with an error - in which case the variable + * will remain unbound. + * * 5. Projection of a select expression which is an aggregate. This case * is tricky. A select expression that is an aggregate which evaluates * to an error will cause an unbound value for to be reported for the @@ -1449,9 +1462,14 @@ /* * 4. The projection of a select expression which is not an - * aggregate. The projected variable will be definitely + * aggregate. Normally, the projected variable will be * bound if all components of the select expression are - * definitely bound. + * definitely bound: this comment ignores the possibility + * that the expression may raise an error, in which case + * this block of code is incorrect. + * As of Oct 11, 2013 - we are no-fixing this + * because of caution about the performance impact, + * and it seeming to be a corner case. See trac 750. * * TODO Does coalesce() change the semantics for this * analysis? If any of the values for coalesce() is This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <jer...@us...> - 2013-10-11 23:08:09
|
Revision: 7454 http://bigdata.svn.sourceforge.net/bigdata/?rev=7454&view=rev Author: jeremy_carroll Date: 2013-10-11 23:08:01 +0000 (Fri, 11 Oct 2013) Log Message: ----------- More tests for 736 Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestTickets.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min-max.ttl branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.srx branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.rq branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.srx Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestTickets.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestTickets.java 2013-10-11 18:44:06 UTC (rev 7453) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/TestTickets.java 2013-10-11 23:08:01 UTC (rev 7454) @@ -253,4 +253,76 @@ } + + public void test_ticket_min736() throws Exception { + + new TestHelper("aggregate-min",// testURI, + "aggregate-min.rq",// queryFileURL + "aggregate-min-max.ttl",// dataFileURL + "aggregate-min.srx",// resultFileURL + false, // laxCardinality + true // checkOrder + ).runTest(); + + } + + public void test_ticket_max736() throws Exception { + + new TestHelper("aggregate-max",// testURI, + "aggregate-max.rq",// queryFileURL + "aggregate-min-max.ttl",// dataFileURL + "aggregate-max.srx",// resultFileURL + false, // laxCardinality + true // checkOrder + ).runTest(); + + } + + public void test_ticket_min736_1() throws Exception { + + new TestHelper("aggregate-min1",// testURI, + "aggregate-min1.rq",// queryFileURL + "aggregate-min-max.ttl",// dataFileURL + "aggregate-min1.srx",// resultFileURL + false, // laxCardinality + true // checkOrder + ).runTest(); + + } + + public void test_ticket_max736_1() throws Exception { + + new TestHelper("aggregate-max1",// testURI, + "aggregate-max1.rq",// queryFileURL + "aggregate-min-max.ttl",// dataFileURL + "aggregate-max1.srx",// resultFileURL + false, // laxCardinality + true // checkOrder + ).runTest(); + + } + + public void test_ticket_min736_2() throws Exception { + + new TestHelper("aggregate-min2",// testURI, + "aggregate-min2.rq",// queryFileURL + "aggregate-min-max.ttl",// dataFileURL + "aggregate-min2.srx",// resultFileURL + false, // laxCardinality + true // checkOrder + ).runTest(); + + } + + public void test_ticket_max736_2() throws Exception { + + new TestHelper("aggregate-max2",// testURI, + "aggregate-max2.rq",// queryFileURL + "aggregate-min-max.ttl",// dataFileURL + "aggregate-max2.srx",// resultFileURL + false, // laxCardinality + true // checkOrder + ).runTest(); + + } } Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.rq 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,8 @@ + +prefix : <http://example/> + +SELECT (MAX(?o) AS ?m) +WHERE { + ?s :p ?o +} + Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max.srx 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="m"/> + </head> + <results> + <result> + <binding name="m"> + <literal datatype="http://www.w3.org/2001/XMLSchema#integer">3</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.rq 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,8 @@ + +prefix : <http://example/> + +SELECT (MAX(?o) AS ?m) +WHERE { + ?s :q ?o +} + Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max1.srx 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="m"/> + </head> + <results> + <result> + <binding name="m"> + <literal xml:lang="en">bigdata</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.rq 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,8 @@ + +prefix : <http://example/> + +SELECT (MAX(?o) AS ?m) +WHERE { + ?s ?p ?o +} + Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-max2.srx 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="m"/> + </head> + <results> + <result> + <binding name="m"> + <literal datatype="http://www.w3.org/2001/XMLSchema#integer">3</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min-max.ttl =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min-max.ttl (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min-max.ttl 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,10 @@ +@prefix : <http://example/> . + +:x1 :p 1.5 . +:x1 :p 3 . +:x1 :p :x2 . +:x1 :q "bigdata"@en. +:x1 :q "bigdata"@de. +:x1 :r <http://www.bigdata.com/blog/> . + + Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.rq 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,6 @@ +prefix : <http://example/> + +SELECT (MIN(?o) AS ?m) +WHERE { + ?s :p ?o +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min.srx 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,14 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="m"/> + </head> + <results> + <result> + <binding name="m"><uri>http://example/x2</uri></binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.rq 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,6 @@ +prefix : <http://example/> + +SELECT (MIN(?o) AS ?m) +WHERE { + ?s :q ?o +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min1.srx 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,17 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="m"/> + </head> + <results> + <result> + <binding name="m"> + <literal + xml:lang="de">bigdata</literal> + </binding> + </result> + </results> +</sparql> Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.rq =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.rq (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.rq 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,6 @@ +prefix : <http://example/> + +SELECT (MIN(?o) AS ?m) +WHERE { + ?s ?p ?o +} Added: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.srx =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.srx (rev 0) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/rdf/sparql/ast/eval/aggregate-min2.srx 2013-10-11 23:08:01 UTC (rev 7454) @@ -0,0 +1,14 @@ +<?xml version="1.0"?> +<sparql + xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" + xmlns:xs="http://www.w3.org/2001/XMLSchema#" + xmlns="http://www.w3.org/2005/sparql-results#" > + <head> + <variable name="m"/> + </head> + <results> + <result> + <binding name="m"><uri>http://example/x2</uri></binding> + </result> + </results> +</sparql> This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-11 18:44:14
|
Revision: 7453 http://bigdata.svn.sourceforge.net/bigdata/?rev=7453&view=rev Author: thompsonbry Date: 2013-10-11 18:44:06 +0000 (Fri, 11 Oct 2013) Log Message: ----------- Bug fix for #736 (MIN/MAX should use ORDER BY semantics). Workaround for #740 (NSPIN) so we can inspect this further. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/IVComparator.java branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/CompareBOp.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMAX.java branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMIN.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/IVComparator.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/IVComparator.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/bop/solutions/IVComparator.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -51,7 +51,7 @@ /** * A comparator that compares {@link IV}s according the SPARQL value ordering as - * specified in <A + * specified in <a * href="http://www.w3.org/TR/rdf-sparql-query/#modOrderBy">SPARQL Query * Language for RDF</a>. This implementation is based on the openrdf * {@link ValueComparator} but has been modified to work with {@link IV}s. Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/relation/accesspath/BlockingBuffer.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -115,19 +115,28 @@ /** * The #of times that we will use {@link BlockingQueue#offer(Object)} or - * {@link Queue#poll()} before converting to the variants of those methods - * which accept a timeout. The timeouts are used to reduce the contention - * for the queue if either the producer or the consumer is lagging. + * before converting to the variant of that method which accept a timeout. + * The timeouts are used to reduce the contention for the queue if either + * the consumer is lagging. */ - private static final int NSPIN = 100; - + private static final int NSPIN_ADD = Integer.valueOf(System.getProperty( + BlockingBuffer.class.getName() + ".NSPIN.ADD", "100")); + /** + * The #of times that we will use {@link Queue#poll()} before converting to + * the variant of that method which accept a timeout. The timeouts are used + * to reduce the contention for the queue if either the producer is lagging. + */ + private static final int NSPIN_READ = Integer.valueOf(System.getProperty( + BlockingBuffer.class.getName() + ".NSPIN.READ", "100")); + + /** * The timeout for offer() or poll() as a function of the #of tries that * have already been made to {@link #add(Object)} or read a chunk. * * @param ntries * The #of tries. - * + * * @return The timeout (milliseconds). */ private static final long getTimeoutMillis(final int ntries) { @@ -1038,7 +1047,7 @@ final boolean added; - if (ntries < NSPIN) { + if (ntries < NSPIN_ADD) { // offer (non-blocking). added = queue.offer(e); @@ -1804,7 +1813,7 @@ * inside of poll(timeout,unit) [@todo This could be fixed by a poison pill.] */ - if (ntries < NSPIN) { + if (ntries < NSPIN_READ) { /* * This is basically a spin lock (it can spin without Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MAX.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -25,13 +25,11 @@ import java.util.Map; -import org.openrdf.query.algebra.Compare.CompareOp; -import org.openrdf.query.algebra.evaluation.util.ValueComparator; - import com.bigdata.bop.BOp; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.aggregate.AggregateBase; +import com.bigdata.bop.solutions.IVComparator; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.constraints.CompareBOp; import com.bigdata.rdf.internal.constraints.INeedsMaterialization; @@ -43,7 +41,7 @@ * <p> * Note: MIN (and MAX) are defined in terms of the ORDER_BY semantics for * SPARQL. Therefore, this must handle comparisons when the value is not an IV, - * e.g., using {@link ValueComparator}. + * e.g., using {@link IVComparator}. * * @author thompsonbry * @@ -58,15 +56,23 @@ */ private static final long serialVersionUID = 1L; - public MAX(MAX op) { + /** + * Provides SPARQL <em>ORDER BY</em> semantics. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/736"> + * MIN() malfunction </a> + */ + private static final transient IVComparator comparator = new IVComparator(); + + public MAX(final MAX op) { super(op); } - public MAX(BOp[] args, Map<String, Object> annotations) { + public MAX(final BOp[] args, final Map<String, Object> annotations) { super(args, annotations); } - public MAX(boolean distinct, IValueExpression...expr) { + public MAX(final boolean distinct, final IValueExpression...expr) { super(distinct, expr); } @@ -103,46 +109,44 @@ } private IV doGet(final IBindingSet bindingSet) { + for (int i = 0; i < arity(); i++) { final IValueExpression<IV<?, ?>> expr = (IValueExpression<IV<?, ?>>) get(i); - final IV iv = expr.get(bindingSet); + final IV iv = expr.get(bindingSet); - if (iv != null) { + if (iv != null) { - /* - * Aggregate non-null values. - */ + /* + * Aggregate non-null values. + */ - if (max == null) { + if (max == null) { - max = iv; + max = iv; - } else { + } else { - /** - * FIXME This needs to use the ordering define by ORDER_BY. The - * CompareBOp imposes the ordering defined for the "<" operator - * which is less robust and will throw a type exception if you - * attempt to compare unlike Values. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/300#comment:5 - */ - if (CompareBOp.compare(iv, max, CompareOp.GT)) { + // Note: This is SPARQL GT semantics, not ORDER BY. +// if (CompareBOp.compare(iv, max, CompareOp.GT)) { - max = iv; + // SPARQL ORDER_BY semantics + if (comparator.compare(iv, max) > 0) { + max = iv; + + } + } } - } - } return max; } + @Override synchronized public void reset() { max = null; @@ -172,6 +176,7 @@ * * FIXME MikeP: What is the right return value here? */ + @Override public Requirement getRequirement() { return INeedsMaterialization.Requirement.ALWAYS; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/bop/rdf/aggregate/MIN.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -25,20 +25,14 @@ import java.util.Map; -import org.apache.log4j.Logger; -import org.openrdf.query.algebra.Compare.CompareOp; -import org.openrdf.query.algebra.evaluation.util.ValueComparator; - import com.bigdata.bop.BOp; -import com.bigdata.bop.BOpBase; import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.aggregate.AggregateBase; -import com.bigdata.bop.aggregate.IAggregate; +import com.bigdata.bop.solutions.IVComparator; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.constraints.CompareBOp; import com.bigdata.rdf.internal.constraints.INeedsMaterialization; -import com.bigdata.rdf.internal.constraints.INeedsMaterialization.Requirement; /** * Operator reports the minimum observed value over the presented binding sets @@ -47,11 +41,11 @@ * <p> * Note: MIN (and MAX) are defined in terms of the ORDER_BY semantics for * SPARQL. Therefore, this must handle comparisons when the value is not an IV, - * e.g., using {@link ValueComparator}. - * + * e.g., using the {@link IVComparator}. + * * @author thompsonbry - * - * TODO What is reported if there are no non-null observations? + * + * TODO What is reported if there are no non-null observations? */ public class MIN extends AggregateBase<IV> implements INeedsMaterialization{ @@ -62,15 +56,23 @@ */ private static final long serialVersionUID = 1L; - public MIN(MIN op) { + /** + * Provides SPARQL <em>ORDER BY</em> semantics. + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/736"> + * MIN() malfunction </a> + */ + private static final transient IVComparator comparator = new IVComparator(); + + public MIN(final MIN op) { super(op); } - public MIN(BOp[] args, Map<String, Object> annotations) { + public MIN(final BOp[] args, final Map<String, Object> annotations) { super(args, annotations); } - public MIN(boolean distinct, IValueExpression...expr) { + public MIN(final boolean distinct, final IValueExpression...expr) { super(distinct, expr); } @@ -107,46 +109,44 @@ } private IV doGet(final IBindingSet bindingSet) { - for(int i=0;i<arity();i++){ + for (int i = 0; i < arity(); i++) { + final IValueExpression<IV> expr = (IValueExpression<IV>) get(i); - final IV iv = expr.get(bindingSet); + final IV iv = expr.get(bindingSet); - if (iv != null) { + if (iv != null) { - /* - * Aggregate non-null values. - */ + /* + * Aggregate non-null values. + */ - if (min == null) { + if (min == null) { - min = iv; + min = iv; - } else { + } else { - /** - * FIXME This needs to use the ordering define by ORDER_BY. The - * CompareBOp imposes the ordering defined for the "<" operator - * which is less robust and will throw a type exception if you - * attempt to compare unlike Values. - * - * @see https://sourceforge.net/apps/trac/bigdata/ticket/300#comment:5 - */ - if (CompareBOp.compare(iv, min, CompareOp.LT)) { + // Note: This is SPARQL LT semantics, not ORDER BY. +// if (CompareBOp.compare(iv, min, CompareOp.LT)) { - min = iv; + // SPARQL ORDER_BY semantics + if (comparator.compare(iv, min) < 0) { + min = iv; + + } + } } - } - } return min; } + @Override synchronized public void reset() { min = null; @@ -176,6 +176,7 @@ * * FIXME MikeP: What is the right return value here? */ + @Override public Requirement getRequirement() { return INeedsMaterialization.Requirement.ALWAYS; Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/CompareBOp.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/CompareBOp.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/java/com/bigdata/rdf/internal/constraints/CompareBOp.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -36,14 +36,21 @@ import com.bigdata.bop.IBindingSet; import com.bigdata.bop.IValueExpression; import com.bigdata.bop.NV; +import com.bigdata.bop.solutions.IVComparator; import com.bigdata.rdf.error.SparqlTypeErrorException; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.impl.literal.LiteralExtensionIV; import com.bigdata.rdf.model.BigdataValue; -import com.bigdata.rdf.sparql.ast.GlobalAnnotations; /** - * Perform open-world value comparison operations per the SPARQL spec. + * Perform open-world value comparison operations per the SPARQL spec (the LT + * operator). This does NOT implement the broader ordering for ORDER BY. That is + * handled by {@link IVComparator}. + * + * @see <a + * href="http://www.w3.org/TR/2013/REC-sparql11-query-20130321/#op_lt"><</a> + * + * @see IVComparator */ public class CompareBOp extends XSDBooleanIVValueExpression implements INeedsMaterialization { Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMAX.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMAX.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMAX.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -227,7 +227,19 @@ } - public void test_max_with_errors() { + /** + * MAX is defined in terms of SPARQL <code>ORDER BY</code> rather than + * <code>GT</code>. + * + * @see <a href="http://www.w3.org/TR/rdf-sparql-query/#modOrderBy">SPARQL + * Query Language for RDF</a> + * @see <a + * href="http://www.w3.org/TR/2013/REC-sparql11-query-20130321/#op_lt"><</a> + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/736"> + * MIN() malfunction </a> + */ + public void test_max_uses_ORDER_BY_not_GT() { final BigdataValueFactory f = BigdataValueFactoryImpl.getInstance(getName()); @@ -261,7 +273,7 @@ * ?org ?auth ?book ?lprice * org1 auth1 book1 9 * org1 auth1 book3 5 - * org1 auth2 book3 7 + * org1 auth2 book3 auth2 * org2 auth3 book4 7 * </pre> */ @@ -285,44 +297,50 @@ assertFalse(op.isDistinct()); assertFalse(op.isWildcard()); - try { - op.reset(); - for (IBindingSet bs : data) { - op.get(bs); - } - fail("Expecting: " + SparqlTypeErrorException.class); - } catch (RuntimeException ex) { - if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { - if (log.isInfoEnabled()) { - log.info("Ignoring expected exception: " + ex); - } - } else { - fail("Expecting: " + SparqlTypeErrorException.class, ex); - } + op.reset(); + for (IBindingSet bs : data) { + op.get(bs); } + assertEquals(price9.get(), op.done()); + +// try { +// op.reset(); +// for (IBindingSet bs : data) { +// op.get(bs); +// } +// fail("Expecting: " + SparqlTypeErrorException.class); +// } catch (RuntimeException ex) { +// if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { +// if (log.isInfoEnabled()) { +// log.info("Ignoring expected exception: " + ex); +// } +// } else { +// fail("Expecting: " + SparqlTypeErrorException.class, ex); +// } +// } +// +// /* +// * Now verify that the error is sticky. +// */ +// try { +// op.done(); +// fail("Expecting: " + SparqlTypeErrorException.class); +// } catch (RuntimeException ex) { +// if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { +// if (log.isInfoEnabled()) { +// log.info("Ignoring expected exception: " + ex); +// } +// } else { +// fail("Expecting: " + SparqlTypeErrorException.class, ex); +// } +// } +// +// /* +// * Now verify that reset() clears the error. +// */ +// op.reset(); +// op.done(); - /* - * Now verify that the error is sticky. - */ - try { - op.done(); - fail("Expecting: " + SparqlTypeErrorException.class); - } catch (RuntimeException ex) { - if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { - if (log.isInfoEnabled()) { - log.info("Ignoring expected exception: " + ex); - } - } else { - fail("Expecting: " + SparqlTypeErrorException.class, ex); - } - } - - /* - * Now verify that reset() clears the error. - */ - op.reset(); - op.done(); - } } Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMIN.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMIN.java 2013-10-11 15:49:09 UTC (rev 7452) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-rdf/src/test/com/bigdata/bop/rdf/aggregate/TestMIN.java 2013-10-11 18:44:06 UTC (rev 7453) @@ -34,7 +34,6 @@ import com.bigdata.bop.Var; import com.bigdata.bop.bindingSet.ListBindingSet; import com.bigdata.journal.ITx; -import com.bigdata.rdf.error.SparqlTypeErrorException; import com.bigdata.rdf.internal.IV; import com.bigdata.rdf.internal.VTE; import com.bigdata.rdf.internal.XSD; @@ -47,7 +46,6 @@ import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.model.BigdataValueFactoryImpl; import com.bigdata.rdf.sparql.ast.GlobalAnnotations; -import com.bigdata.util.InnerCause; /** * Unit tests for {@link MIN}. @@ -227,7 +225,19 @@ } - public void test_min_with_errors() { + /** + * MIN is defined in terms of SPARQL <code>ORDER BY</code> rather than + * <code>LT</code>. + * + * @see <a href="http://www.w3.org/TR/rdf-sparql-query/#modOrderBy">SPARQL + * Query Language for RDF</a> + * @see <a + * href="http://www.w3.org/TR/2013/REC-sparql11-query-20130321/#op_lt"><</a> + * + * @see <a href="https://sourceforge.net/apps/trac/bigdata/ticket/736"> + * MIN() malfunction </a> + */ + public void test_min_uses_ORDER_BY_not_LT() { final BigdataValueFactory f = BigdataValueFactoryImpl.getInstance(getName()); @@ -261,7 +271,7 @@ * ?org ?auth ?book ?lprice * org1 auth1 book1 9 * org1 auth1 book3 5 - * org1 auth2 book3 7 + * org1 auth2 book3 auth2 * org2 auth3 book4 7 * </pre> */ @@ -285,44 +295,51 @@ assertFalse(op.isDistinct()); assertFalse(op.isWildcard()); - try { - op.reset(); - for (IBindingSet bs : data) { - op.get(bs); - } - fail("Expecting: " + SparqlTypeErrorException.class); - } catch (RuntimeException ex) { - if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { - if (log.isInfoEnabled()) { - log.info("Ignoring expected exception: " + ex); - } - } else { - fail("Expecting: " + SparqlTypeErrorException.class, ex); - } + op.reset(); + for (IBindingSet bs : data) { + op.get(bs); } + + assertEquals(auth2.get(), op.done()); - /* - * Now verify that the error is sticky. - */ - try { - op.done(); - fail("Expecting: " + SparqlTypeErrorException.class); - } catch (RuntimeException ex) { - if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { - if (log.isInfoEnabled()) { - log.info("Ignoring expected exception: " + ex); - } - } else { - fail("Expecting: " + SparqlTypeErrorException.class, ex); - } - } +// try { +// op.reset(); +// for (IBindingSet bs : data) { +// op.get(bs); +// } +// fail("Expecting: " + SparqlTypeErrorException.class); +// } catch (RuntimeException ex) { +// if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { +// if (log.isInfoEnabled()) { +// log.info("Ignoring expected exception: " + ex); +// } +// } else { +// fail("Expecting: " + SparqlTypeErrorException.class, ex); +// } +// } +// +// /* +// * Now verify that the error is sticky. +// */ +// try { +// op.done(); +// fail("Expecting: " + SparqlTypeErrorException.class); +// } catch (RuntimeException ex) { +// if (InnerCause.isInnerCause(ex, SparqlTypeErrorException.class)) { +// if (log.isInfoEnabled()) { +// log.info("Ignoring expected exception: " + ex); +// } +// } else { +// fail("Expecting: " + SparqlTypeErrorException.class, ex); +// } +// } +// +// /* +// * Now verify that reset() clears the error. +// */ +// op.reset(); +// op.done(); - /* - * Now verify that reset() clears the error. - */ - op.reset(); - op.done(); - } } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |
From: <tho...@us...> - 2013-10-11 15:49:17
|
Revision: 7452 http://bigdata.svn.sourceforge.net/bigdata/?rev=7452&view=rev Author: thompsonbry Date: 2013-10-11 15:49:09 +0000 (Fri, 11 Oct 2013) Log Message: ----------- Rolling back the change to AbstractQuorum.getClient() and getMember(). This is aggrevating a problem where concurrent actions on the quorum can cause an awaited Condition to not become satisfied. See #718 (HAJournalServer needs to handle ZK client connection loss) Modified Paths: -------------- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-11 15:21:41 UTC (rev 7451) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata/src/java/com/bigdata/quorum/AbstractQuorum.java 2013-10-11 15:49:09 UTC (rev 7452) @@ -733,21 +733,21 @@ @Override public C getClient() { -// lock.lock(); -// try { + lock.lock(); + try { final C client = this.client; if (client == null) throw new IllegalStateException(); return client; -// } finally { -// lock.unlock(); -// } + } finally { + lock.unlock(); + } } @Override public QuorumMember<S> getMember() { -// lock.lock(); -// try { + lock.lock(); + try { final C client = this.client; if (client == null) throw new IllegalStateException(); @@ -755,9 +755,9 @@ return (QuorumMember<S>) client; } throw new UnsupportedOperationException(); -// } finally { -// lock.unlock(); -// } + } finally { + lock.unlock(); + } } /** Modified: branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-11 15:21:41 UTC (rev 7451) +++ branches/BIGDATA_RELEASE_1_3_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2013-10-11 15:49:09 UTC (rev 7452) @@ -93,6 +93,7 @@ import com.bigdata.journal.IRootBlockView; import com.bigdata.journal.RootBlockUtility; import com.bigdata.journal.WORMStrategy; +import com.bigdata.quorum.AbstractQuorum; import com.bigdata.quorum.Quorum; import com.bigdata.quorum.QuorumEvent; import com.bigdata.quorum.QuorumException; @@ -1859,7 +1860,7 @@ while (true) { log.warn("Will do error handler."); - +// ((AbstractQuorum<HAGlue, QuorumService<HAGlue>>) getQuorum()).interruptAll(); /* * Discard the current write set. * This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |