From: <fko...@us...> - 2010-09-01 16:24:34
|
Revision: 3481 http://bigdata.svn.sourceforge.net/bigdata/?rev=3481&view=rev Author: fkoliver Date: 2010-09-01 16:24:23 +0000 (Wed, 01 Sep 2010) Log Message: ----------- 1) Modify bulk loading mechanism to pass URL objects around instead of File objects. 2) Provide URLListScanner class (as replacement for FileSystemScanner) which allows user to provide a list of URLs in the cluster config for the bulk loader. 3) Provide FileServer class which starts a web server for a given directory on a given port, which can be used directly from the cluster configuration file. 4) Provide FileSystemScannerServer class (as replacement for FileSystemScanner) which scans a directory (as before) but uses a provided web server to serve up the files, passing URL objects for that web server to client services. 5) Remove the IRemoteExector interface (and its use of Callable) in favor of IClientService and IDataService (and IClientServiceCallable and IDataServiceCallable). This allows tasks to have start*Task methods which take container specific arguments rather than relying on call() and mutating the tasks after deserialization. 6) If the bulk load fails with an exception, at least print a stack trace. 7) If the bulk load fails to load the ontology (for a NEW tuple store), then delete the newly created store. Modified Paths: -------------- branches/maven_scaleout/src/main/java/com/bigdata/journal/ConcurrencyManager.java branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java branches/maven_scaleout/src/main/java/com/bigdata/rdf/rules/RDFJoinNexus.java branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/AbstractTripleStore.java branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/DataLoader.java branches/maven_scaleout/src/main/java/com/bigdata/relation/locator/DefaultResourceLocator.java branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/AbstractStepTask.java branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/ProgramTask.java branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/pipeline/JoinTaskFactoryTask.java branches/maven_scaleout/src/main/java/com/bigdata/resources/MoveTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/ClientService.java branches/maven_scaleout/src/main/java/com/bigdata/service/DataService.java branches/maven_scaleout/src/main/java/com/bigdata/service/IClientService.java branches/maven_scaleout/src/main/java/com/bigdata/service/IDataService.java branches/maven_scaleout/src/main/java/com/bigdata/service/IDataServiceCallable.java branches/maven_scaleout/src/main/java/com/bigdata/service/ListIndicesTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/MetadataService.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/ClientServer.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/DataServer.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/MetadataServer.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/benchmark/ThroughputMaster.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/AbstractAsynchronousClientTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/AbstractClientTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/AbstractResourceScanner.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/DiscoverServices.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/IAsynchronousClientTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/IResourceScannerFactory.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/MappedTaskMaster.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/ResourceBufferTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/ServiceMap.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/TaskMaster.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/util/DumpFederation.java branches/maven_scaleout/src/main/java/com/bigdata/service/ndx/pipeline/AbstractPendingSetSubtask.java branches/maven_scaleout/src/test/java/com/bigdata/rdf/stress/LoadClosureAndQueryTest.java branches/maven_scaleout/src/test/java/com/bigdata/resources/AbstractResourceManagerTestCase.java branches/maven_scaleout/src/test/java/com/bigdata/service/jini/PerformanceTest.java Added Paths: ----------- branches/maven_scaleout/src/main/java/com/bigdata/service/IClientServiceCallable.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/FileServer.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/FileSystemScannerServer.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/URLListScanner.java Removed Paths: ------------- branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/EmptyProgramTask.java branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/IProgramTask.java branches/maven_scaleout/src/main/java/com/bigdata/service/DataServiceCallable.java branches/maven_scaleout/src/main/java/com/bigdata/service/FederationCallable.java branches/maven_scaleout/src/main/java/com/bigdata/service/IFederationCallable.java branches/maven_scaleout/src/main/java/com/bigdata/service/IRemoteExecutor.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/master/AggregatorTask.java Modified: branches/maven_scaleout/src/main/java/com/bigdata/journal/ConcurrencyManager.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/journal/ConcurrencyManager.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/journal/ConcurrencyManager.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -90,17 +90,17 @@ */ public class ConcurrencyManager implements IConcurrencyManager { - final protected static Logger log = Logger.getLogger(ConcurrencyManager.class); + final private static Logger log = Logger.getLogger(ConcurrencyManager.class); // /** // * True iff the {@link #log} level is INFO or less. // */ -// final protected static boolean INFO = log.isInfoEnabled(); +// final private static boolean INFO = log.isInfoEnabled(); /** * True iff the {@link #log} level is DEBUG or less. */ - final protected static boolean DEBUG = log.isDebugEnabled(); + final private static boolean DEBUG = log.isDebugEnabled(); /** * Options for the {@link ConcurrentManager}. @@ -340,7 +340,7 @@ * Once the transaction has acquired those writable indices it then runs its * commit phrase as an unisolated operation on the {@link #writeService}. */ - final protected ThreadPoolExecutor txWriteService; + final private ThreadPoolExecutor txWriteService; /** * Pool of threads for handling concurrent unisolated read operations on @@ -358,7 +358,7 @@ * historical commit records (which may span more than one logical * journal) until the reader terminates. */ - final protected ThreadPoolExecutor readService; + final private ThreadPoolExecutor readService; /** * Pool of threads for handling concurrent unisolated write operations on @@ -371,6 +371,7 @@ * Serialization of access to unisolated named indices is acomplished by * gaining an exclusive lock on the unisolated named index. */ + // protected for access by tests. final protected WriteExecutorService writeService; /** @@ -400,7 +401,7 @@ } - protected void assertOpen() { + private void assertOpen() { if (!open) throw new IllegalStateException(); @@ -466,7 +467,7 @@ * Long.MAX_VALUE. */ - final long shutdownTimeout = this.shutdownTimeout == 0L ? Long.MAX_VALUE + final long tmpShutdownTimeout = this.shutdownTimeout == 0L ? Long.MAX_VALUE : this.shutdownTimeout; final TimeUnit unit = TimeUnit.MILLISECONDS; @@ -486,7 +487,7 @@ final long elapsed = System.currentTimeMillis() - begin; - if(!txWriteService.awaitTermination(shutdownTimeout-elapsed, unit)) { + if(!txWriteService.awaitTermination(tmpShutdownTimeout-elapsed, unit)) { log.warn("Transaction service termination: timeout"); @@ -505,7 +506,7 @@ final long elapsed = System.currentTimeMillis() - begin; - if(!readService.awaitTermination(shutdownTimeout-elapsed, unit)) { + if(!readService.awaitTermination(tmpShutdownTimeout-elapsed, unit)) { log.warn("Read service termination: timeout"); @@ -521,7 +522,7 @@ final long elapsed = System.currentTimeMillis() - begin; - final long timeout = shutdownTimeout-elapsed; + final long timeout = tmpShutdownTimeout-elapsed; if (log.isInfoEnabled()) log.info("Awaiting write service termination: will wait " @@ -921,13 +922,13 @@ } /** Counters for {@link #writeService}. */ - protected final WriteTaskCounters countersUN = new WriteTaskCounters(); + final WriteTaskCounters countersUN = new WriteTaskCounters(); /** Counters for the {@link #txWriteService}. */ - protected final TaskCounters countersTX = new TaskCounters(); + final TaskCounters countersTX = new TaskCounters(); /** Counters for the {@link #readService}. */ - protected final TaskCounters countersHR = new TaskCounters(); + final TaskCounters countersHR = new TaskCounters(); /** * Sampling instruments for the various queues giving us the moving average Modified: branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFDataLoadMaster.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -23,7 +23,6 @@ */ package com.bigdata.rdf.load; -import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.io.ObjectInputStream; @@ -57,9 +56,11 @@ import com.bigdata.service.jini.JiniFederation; import com.bigdata.service.jini.master.AbstractAsynchronousClientTask; import com.bigdata.service.jini.master.ClientLocator; +import com.bigdata.service.jini.master.FileServer; import com.bigdata.service.jini.master.INotifyOutcome; import com.bigdata.service.jini.master.MappedTaskMaster; import com.bigdata.service.jini.master.TaskMaster; +import java.net.URL; /** * Distributed bulk loader for RDF data. Creates/(re-)opens the @@ -84,7 +85,7 @@ >// extends MappedTaskMaster<S, T, L, U, V> { - final protected static Logger log = Logger + final private static Logger log = Logger .getLogger(MappedRDFDataLoadMaster.class); /** @@ -263,7 +264,7 @@ */ String RDF_FORMAT = "rdfFormat"; - String DEFAULT_RDF_FORMAT = RDFFormat.RDFXML.toString(); + String DEFAULT_RDF_FORMAT = RDFFormat.RDFXML.getName(); // /** // * The maximum #of times an attempt will be made to load any given file. @@ -297,7 +298,7 @@ * * @see ConfigurationOptions#ONTOLOGY */ - public final File ontology; + public final URL ontology; /** * Only files matched by the filter will be processed (optional, but @@ -514,9 +515,9 @@ namespace = (String) config.getEntry(component, ConfigurationOptions.NAMESPACE, String.class); - ontology = (File) config + ontology = (URL) config .getEntry(component, ConfigurationOptions.ONTOLOGY, - File.class, null/* defaultValue */); + URL.class, null/* defaultValue */); ontologyFileFilter = (FilenameFilter) config.getEntry(component, ConfigurationOptions.ONTOLOGY_FILE_FILTER, @@ -583,7 +584,7 @@ final String tmp = (String) config.getEntry(component, ConfigurationOptions.RDF_FORMAT, String.class, - ConfigurationOptions.DEFAULT_RDF_FORMAT.toString()); + ConfigurationOptions.DEFAULT_RDF_FORMAT); if (tmp != null) { @@ -639,13 +640,14 @@ // execute master wait for it to finish. task.execute(); - + } catch (Throwable e) { + e.printStackTrace(); } finally { - + FileServer.stopAll(); fed.shutdown(); } - + } public MappedRDFDataLoadMaster(final JiniFederation fed) @@ -658,6 +660,7 @@ /** * Extended to support optional load, closure, and reporting. */ + @Override protected void runJob() throws Exception { final S jobState = getJobState(); @@ -851,6 +854,7 @@ /** * Extended to open/create the KB. */ + @Override protected void beginJob(final S jobState) throws Exception { super.beginJob(jobState); @@ -900,7 +904,7 @@ loadOntology(tripleStore); } catch (Exception ex) { - + tripleStore.destroy(); // Don't leave badly configured store. throw new RuntimeException("Could not load: " + jobState.ontology, ex); Modified: branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/rdf/load/MappedRDFFileLoadTask.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -20,8 +20,9 @@ import com.bigdata.rdf.rio.RDFParserOptions; import com.bigdata.rdf.store.AbstractTripleStore; import com.bigdata.rdf.store.ScaleOutTripleStore; -import com.bigdata.service.IRemoteExecutor; -import com.bigdata.service.jini.JiniFederation; +import com.bigdata.service.ClientService; +import com.bigdata.service.IBigdataFederation; +import com.bigdata.service.IClientService; import com.bigdata.service.jini.master.AbstractAsynchronousClientTask; import com.bigdata.service.jini.master.ClientLocator; import com.bigdata.service.jini.master.INotifyOutcome; @@ -47,12 +48,11 @@ * @param <V> * The generic type of the client state (stored in zookeeper). */ -public class MappedRDFFileLoadTask<// -S extends JobState,// -V extends Serializable,// -L extends ClientLocator// -> extends AbstractAsynchronousClientTask<Void,V,L>// -implements Serializable { +public class MappedRDFFileLoadTask<S extends JobState, + V extends Serializable, + L extends ClientLocator> + extends AbstractAsynchronousClientTask<Void,V,L>// + implements Serializable { final protected transient static Logger log = Logger .getLogger(MappedRDFFileLoadTask.class); @@ -73,7 +73,7 @@ protected final L locator; /** - * Instantiated by {@link #call()} on the {@link IRemoteExecutor} service. + * Instantiated by {@link #startClientTask()} on the {@link IClientService} service. * This is volatile because it is used by some methods which do not obtain * the {@link #lock}. */ @@ -125,7 +125,7 @@ * to the load balancer). */ private transient volatile CounterSet counters; - + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { in.defaultReadObject(); @@ -136,6 +136,7 @@ isDone = false; } + @Override public String toString() { return getClass().getName() + // @@ -165,18 +166,9 @@ } - /** - * The federation object used by the {@link IRemoteExecutor} on which this - * task is executing. - */ - public JiniFederation<?> getFederation() { + protected void setUp(IBigdataFederation federation) + throws InterruptedException { - return (JiniFederation<?>) super.getFederation(); - - } - - protected void setUp() throws InterruptedException { - // set transient fields. // lock = new ReentrantLock(); // allDone = lock.newCondition(); @@ -188,9 +180,9 @@ if (log.isInfoEnabled()) log.info(toString()); - final AbstractTripleStore tripleStore = (AbstractTripleStore) getFederation() - .getResourceLocator().locate(jobState.namespace, - ITx.UNISOLATED); + final AbstractTripleStore tripleStore = (AbstractTripleStore) + federation.getResourceLocator().locate(jobState.namespace, + ITx.UNISOLATED); if (tripleStore == null) { @@ -297,15 +289,14 @@ */ { - final CounterSet serviceRoot = getFederation() - .getServiceCounterSet(); + final CounterSet serviceRoot = federation.getServiceCounterSet(); final String relPath = jobState.jobName; // Create path to counter set. final CounterSet tmp = serviceRoot.makePath(relPath); - final CounterSet counters = statementBufferFactory + final CounterSet tmpCounters = statementBufferFactory .getCounters(); // if (log.isDebugEnabled()) @@ -313,7 +304,7 @@ // + counters); // Attach counters [the counters are MOVEd to tmp]. - tmp.attach(counters, true/* replace */); + tmp.attach(tmpCounters, true/* replace */); // Note reference to the current counters for log messages. this.counters = tmp; @@ -347,11 +338,12 @@ } - public Void call() throws Exception { + public Void startClientTask(IBigdataFederation federation, + ClientService clientService) throws Exception { try { - setUp(); + setUp(federation); /* * Wait until either (a) interrupted by the master using @@ -422,8 +414,8 @@ // } /** - * Block until {@link #call()} has fully initialized the instance of this - * class running on the {@link IRemoteExecutor}. This method should be used + * Block until {@link #startClientTask()} has fully initialized the instance of this + * class running on the {@link IClientService}. This method should be used * to guard methods on this or derived classes which can be invoked by RMI * and which depend on {@link #setUp()}. */ Modified: branches/maven_scaleout/src/main/java/com/bigdata/rdf/rules/RDFJoinNexus.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/rdf/rules/RDFJoinNexus.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/rdf/rules/RDFJoinNexus.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -96,11 +96,9 @@ import com.bigdata.relation.rule.eval.AbstractSolutionBuffer; import com.bigdata.relation.rule.eval.ActionEnum; import com.bigdata.relation.rule.eval.DefaultRangeCountFactory; -import com.bigdata.relation.rule.eval.EmptyProgramTask; import com.bigdata.relation.rule.eval.IEvaluationPlanFactory; import com.bigdata.relation.rule.eval.IJoinNexus; import com.bigdata.relation.rule.eval.IJoinNexusFactory; -import com.bigdata.relation.rule.eval.IProgramTask; import com.bigdata.relation.rule.eval.IRangeCountFactory; import com.bigdata.relation.rule.eval.IRuleState; import com.bigdata.relation.rule.eval.IRuleStatisticsFactory; @@ -117,6 +115,7 @@ import com.bigdata.striterator.ChunkedArrayIterator; import com.bigdata.striterator.ChunkedConvertingIterator; import com.bigdata.striterator.DistinctFilter; +import com.bigdata.striterator.EmptyChunkedIterator; import com.bigdata.striterator.IChunkedIterator; import com.bigdata.striterator.IChunkedOrderedIterator; @@ -163,7 +162,6 @@ * {@link IAsynchronousIterator} when wrapped for RMI. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class RDFJoinNexus implements IJoinNexus { @@ -1324,10 +1322,10 @@ final IBuffer<ISolution[]> targetBuffer, final int chunkCapacity) { // MAY be null. - final IElementFilter<ISolution> filter = getSolutionFilter(); + final IElementFilter<ISolution> tmpFilter = getSolutionFilter(); return new UnsynchronizedArrayBuffer<ISolution>(targetBuffer, - chunkCapacity, filter); + chunkCapacity, tmpFilter); } @@ -1351,7 +1349,6 @@ * {@link #flush() flushed}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * @param <E> */ public static class InsertSPOAndJustificationBuffer<E> extends AbstractSolutionBuffer<E> { @@ -1562,6 +1559,22 @@ } + // Code formerly from EmptyProgramTask. + private Object emptyProgram(ActionEnum action, IStep step) { + if (action == null) + throw new IllegalArgumentException(); + if (step == null) + throw new IllegalArgumentException(); + if (step.isRule() || ((IProgram)step).stepCount() != 0) { + throw new IllegalArgumentException(); + } + if (action.isMutation()) { + return Long.valueOf(0L); + } else { + return new EmptyChunkedIterator<ISolution>(null/* keyOrder */); + } + } + @SuppressWarnings("unchecked") public IChunkedOrderedIterator<ISolution> runQuery(final IStep step) throws Exception { @@ -1576,8 +1589,8 @@ log.warn("Empty program"); - return (IChunkedOrderedIterator<ISolution>) new EmptyProgramTask( - ActionEnum.Query, step).call(); + return (IChunkedOrderedIterator<ISolution>) + emptyProgram(ActionEnum.Query, step); } @@ -1694,7 +1707,7 @@ log.warn("Empty program"); - return (Long) new EmptyProgramTask(action, step).call(); + return (Long) emptyProgram(action, step); } @@ -1738,12 +1751,12 @@ if (step == null) throw new IllegalArgumentException(); - final IIndexManager indexManager = getIndexManager(); + final IIndexManager tmpIndexManager = getIndexManager(); - if (indexManager instanceof IBigdataFederation<?>) { + if (tmpIndexManager instanceof IBigdataFederation<?>) { // distributed program execution. - return runDistributedProgram((IBigdataFederation<?>) indexManager, + return runDistributedProgram((IBigdataFederation<?>) tmpIndexManager, action, step); } else { @@ -1766,10 +1779,10 @@ log.info("Running local program: action=" + action + ", program=" + step.getName()); - final IProgramTask innerTask = new ProgramTask(action, step, + final ProgramTask innerTask = new ProgramTask(action, step, getJoinNexusFactory(), getIndexManager()); - return innerTask.call(); + return innerTask.startDataTask(getIndexManager(), null); } @@ -1787,10 +1800,10 @@ } - final IProgramTask innerTask = new ProgramTask(action, step, + final ProgramTask innerTask = new ProgramTask(action, step, getJoinNexusFactory(), getIndexManager()); - return innerTask.call(); + return innerTask.startDataTask(fed, null); } @@ -1816,7 +1829,7 @@ } - final IProgramTask innerTask = new ProgramTask(action, step, + final ProgramTask innerTask = new ProgramTask(action, step, getJoinNexusFactory()); return dataService.submit(innerTask).get(); Modified: branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/AbstractTripleStore.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/AbstractTripleStore.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -1387,6 +1387,9 @@ if (lex != null) lex.destroy(); } + // Remove the triple store from the global row store. + getIndexManager().getGlobalRowStore().delete( + RelationSchema.INSTANCE, getNamespace()); lexiconRelation = null; Modified: branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/DataLoader.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/DataLoader.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/rdf/store/DataLoader.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -36,7 +36,9 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; +import java.net.URISyntaxException; import java.net.URL; +import java.net.URLConnection; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -739,19 +741,19 @@ log.info("loading: " + resource); // try the classpath - InputStream rdfStream = getClass().getResourceAsStream(resource); + URL url = getClass().getResource(resource); - if (rdfStream == null) { + if (url == null) { // Searching for the resource from the root of the class returned // by getClass() (relative to the class' package) failed. // Next try searching for the desired resource from the root // of the jar; that is, search the jar file for an exact match // of the input string. - rdfStream = - getClass().getClassLoader().getResourceAsStream(resource); + url = + getClass().getClassLoader().getResource(resource); - if (rdfStream == null) { + if (url == null) { /* * If we do not find as a Resource then try the file system. @@ -761,8 +763,8 @@ if(file.exists()) { - loadFiles(totals, 0/* depth */, file, baseURL, - rdfFormat, filter, endOfBatch); + loadFiles(totals, 0/* depth */, file.toURI().toURL(), + baseURL, rdfFormat, filter, endOfBatch); return; @@ -775,15 +777,16 @@ * Obtain a buffered reader on the input stream. */ - if (rdfStream == null) { + if (url == null) { throw new IOException("Could not locate resource: " + resource); } - // @todo reuse the backing buffer to minimize heap churn. + URLConnection connection = url.openConnection(); + InputStream is = connection.getInputStream(); final Reader reader = new BufferedReader( - new InputStreamReader(rdfStream) + new InputStreamReader(is) // , 20*Bytes.kilobyte32 // use a large buffer (default is 8k) ); @@ -798,9 +801,7 @@ } finally { reader.close(); - - rdfStream.close(); - + is.close(); } } @@ -826,73 +827,68 @@ * * @throws IOException */ - public LoadStats loadFiles(final File file, final String baseURI, + public LoadStats loadFiles(final URL url, final String baseURI, final RDFFormat rdfFormat, final FilenameFilter filter) throws IOException { - if (file == null) + if (url == null) throw new IllegalArgumentException(); final LoadStats totals = new LoadStats(); - loadFiles(totals, 0/* depth */, file, baseURI, rdfFormat, filter, true/* endOfBatch */ - ); + loadFiles(totals, 0/* depth */, url, baseURI, + rdfFormat, filter, true/* endOfBatch */); return totals; } protected void loadFiles(final LoadStats totals, final int depth, - final File file, final String baseURI, final RDFFormat rdfFormat, + final URL url, final String baseURI, final RDFFormat rdfFormat, final FilenameFilter filter, final boolean endOfBatch) throws IOException { - if (file.isDirectory()) { + // Legacy behavior - allow local files and directories for now, + // but data should only be loaded from outside the cluster, not + // from inside. + if (url.getProtocol().equals("file")) { + File file; + try { + file = new File(url.toURI()); + } catch (URISyntaxException ex) { + throw new IOException("Unable to decode URL", ex); + } + if (file.isDirectory()) { - if (log.isInfoEnabled()) - log.info("loading directory: " + file); + if (log.isInfoEnabled()) + log.info("loading directory: " + file); -// final LoadStats loadStats = new LoadStats(); - - final File[] files = (filter != null ? file.listFiles(filter) + final File[] files = (filter != null ? file.listFiles(filter) : file.listFiles()); - for (int i = 0; i < files.length; i++) { + for (int i = 0; i < files.length; i++) { - final File f = files[i]; + final File f = files[i]; -// final RDFFormat fmt = RDFFormat.forFileName(f.toString(), -// rdfFormat); - - loadFiles(totals, depth + 1, f, baseURI, rdfFormat, filter, + loadFiles(totals, depth + 1, f.toURI().toURL(), baseURI, + rdfFormat, filter, (depth == 0 && i < files.length ? false : endOfBatch)); - - } - - return; - - } - - final String n = file.getName(); - - RDFFormat fmt = RDFFormat.forFileName(n); - if (fmt == null && n.endsWith(".zip")) { - fmt = RDFFormat.forFileName(n.substring(0, n.length() - 4)); - } + } - if (fmt == null && n.endsWith(".gz")) { - fmt = RDFFormat.forFileName(n.substring(0, n.length() - 3)); - } + return; - if (fmt == null) // fallback - fmt = rdfFormat; + } + } + + final String n = url.getPath(); InputStream is = null; try { - is = new FileInputStream(file); + URLConnection connection = url.openConnection(); + is = connection.getInputStream(); if (n.endsWith(".gz")) { @@ -916,23 +912,19 @@ try { // baseURI for this file. @todo do we need to encode this URI? - final String s = baseURI != null ? baseURI : file.toURI() + final String s = baseURI != null ? baseURI : url.toURI() .toString(); - loadData3(totals, reader, s, fmt, endOfBatch); + loadData3(totals, reader, s, rdfFormat, endOfBatch); return; - } catch (Exception ex) { - - throw new RuntimeException("While loading: " + file, ex); - } finally { - reader.close(); - } + } catch (Exception ex) { + throw new RuntimeException("While loading: " + url, ex); } finally { if (is != null) @@ -1359,7 +1351,8 @@ // dataLoader.loadFiles(fileOrDir, null/* baseURI */, // rdfFormat, filter); - dataLoader.loadFiles(totals, 0/* depth */, fileOrDir, baseURI, + dataLoader.loadFiles(totals, 0/* depth */, + fileOrDir.toURI().toURL(), baseURI, rdfFormat, filter, true/* endOfBatch */ ); Modified: branches/maven_scaleout/src/main/java/com/bigdata/relation/locator/DefaultResourceLocator.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/relation/locator/DefaultResourceLocator.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -239,7 +239,8 @@ final Properties properties = locateResource(namespace, timestamp, foundOn); - if (properties == null) { + // Empty properties may refer to deleted resource. + if (properties == null || properties.isEmpty()) { // Not found by this locator. @@ -426,7 +427,8 @@ final Properties properties = locateResourceOn(indexManager, namespace, timestamp); - if (properties != null) { + // Empty properties may refer to deleted resource. + if (properties != null && !properties.isEmpty()) { if (INFO) { Modified: branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/AbstractStepTask.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/AbstractStepTask.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/AbstractStepTask.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -57,7 +57,6 @@ import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.IStep; import com.bigdata.service.DataService; -import com.bigdata.service.DataServiceCallable; import com.bigdata.service.IDataServiceCallable; import com.bigdata.service.ndx.ClientIndexView; import com.bigdata.service.ndx.IClientIndex; @@ -66,7 +65,7 @@ * @author <a href="mailto:tho...@us...">Bryan Thompson</a> * @version $Id$ */ -abstract public class AbstractStepTask extends DataServiceCallable<RuleStats> +abstract public class AbstractStepTask implements IStepTask, Cloneable { protected static final transient Logger log = Logger.getLogger(AbstractStepTask.class); @@ -87,7 +86,7 @@ * notices this case and causes <i>this</i> task to be {@link #clone()}ed, * the {@link ExecutorService} set on the clone, and the clone is then * submitted to the {@link ConcurrencyManager} for the {@link DataService}. - * + * * @param action * Indicate whether this is a query or a mutation operation. * @param joinNexusFactory @@ -98,7 +97,7 @@ * @param dataService * non-<code>null</code> iff the caller is already running on * a {@link DataService}. - * + * * @throws IllegalArgumentException * if <i>action</i> is <code>null</code>. * @throws IllegalArgumentException @@ -117,40 +116,36 @@ if (joinNexusFactory == null) throw new IllegalArgumentException(); - + if (step == null) throw new IllegalArgumentException(); this.action = action; - + this.joinNexusFactory = joinNexusFactory; - + this.step = step; - + this.indexManager = indexManager; // @todo MAY be null? - - if (dataService != null) - setDataService(dataService); - } public String toString() { - + return "{" + getClass().getSimpleName() + ", action=" + action + ", step=" + step.getName() + ", joinNexusFactory=" - + joinNexusFactory + ", indexManager=" + indexManager+"}"; - + + joinNexusFactory + ", indexManager=" + indexManager+"}"; + } - + /** * Run program steps in parallel. - * + * * @param program * @param tasks - * + * * @throws InterruptedException * @throws ExecutionException - * + * * @todo adapt the {@link ClientIndexView} code so that we notice all * errors, log them all, and report them all in a single thrown * exception. note that we may be running asynchronously inside of a @@ -164,84 +159,84 @@ protected RuleStats runParallel(final IJoinNexus joinNexus, final IStep program, final List<Callable<RuleStats>> tasks) throws InterruptedException, ExecutionException { - + if (log.isInfoEnabled()) log.info("program=" + program.getName() + ", #tasks=" + tasks.size()); - + if (indexManager == null) throw new IllegalStateException(); - + final RuleStats totals = joinNexus.getRuleStatisticsFactory().newInstance(program); - + final ExecutorService service = indexManager.getExecutorService(); - + // submit tasks and await their completion. final List<Future<RuleStats>> futures = service.invokeAll(tasks); - + // verify no problems with tasks. for (Future<RuleStats> f : futures) { - + final RuleStats tmp = f.get(); - + totals.add(tmp); - + } - + if (log.isInfoEnabled()) log.info("program=" + program.getName() + ", #tasks=" + tasks.size() + " - done"); - + return totals; - + } /** * Run program steps in sequence. - * + * * @param program * @param tasks - * - * @return + * + * @return * @throws InterruptedException * @throws ExecutionException */ protected RuleStats runSequential(final IJoinNexus joinNexus, final IStep program, final List<Callable<RuleStats>> tasks) throws InterruptedException, ExecutionException { - + final int ntasks = tasks.size(); - + if (log.isInfoEnabled()) log.info("program=" + program.getName() + ", #tasks=" + ntasks); - + if (indexManager == null) throw new IllegalStateException(); - + final ExecutorService service = indexManager.getExecutorService(); - + final RuleStats totals = joinNexus.getRuleStatisticsFactory().newInstance(program); - + final Iterator<Callable<RuleStats>> itr = tasks.iterator(); - + int n = 0; - + while (itr.hasNext()) { - + final Callable<RuleStats> task = itr.next(); - + /* * Submit and wait for the future. - * + * * Note: tasks that are run in a sequential program are required to * flush the buffer so that all solutions are available for the next * step of the program. This is critical for programs that have * dependencies between their steps. -// * +// * // * Note: This is handled by the task factory. */ final RuleStats tmp = service.submit(task).get(); - + totals.add(tmp); n++; @@ -258,9 +253,9 @@ if (log.isInfoEnabled()) log.info("program=" + program.getName() + ", #tasks=" + ntasks + " - done"); - + return totals; - + } /** @@ -272,10 +267,10 @@ * layering of the {@link RuleStats} (this is due to a coupling between the * {@link RuleStats} reporting structure and the control structure for * executing the tasks). - * + * * @param program * @param tasks - * + * * @return * @throws InterruptedException * @throws ExecutionException @@ -283,21 +278,21 @@ protected RuleStats runOne(final IJoinNexus joinNexus, final IStep program, final Callable<RuleStats> task) throws InterruptedException, ExecutionException { - + if (log.isInfoEnabled()) log.info("program=" + program.getName()); - + if (indexManager == null) throw new IllegalStateException(); - + /* * Submit and wait for the future. - * + * * Note: tasks that are run in a sequential (or as a single task) * program are required to flush the buffer so that all solutions are * available for the next step of the program. This is critical for * programs that have dependencies between their steps. - * + * * Note: This is handled by the task factory. */ // final ExecutorService service = indexManager.getExecutorService(); @@ -311,9 +306,9 @@ if (log.isInfoEnabled()) log.info("program=" + program.getName() + " - done"); - + return stats; - + } /** @@ -337,21 +332,21 @@ * {@link AbstractTask} will wind up using an {@link IClientIndex} view and * lose the benefits of access to unisolated indices. */ - public Future<RuleStats> submit() { + public Future<RuleStats> submit(DataService dataService) { - if (!isDataService()) { + if (dataService == null) { return indexManager.getExecutorService().submit(this); } - return submitToConcurrencyManager(); - + return submitToConcurrencyManager(dataService); + } - - private Future<RuleStats> submitToConcurrencyManager() { - - if (!isDataService()) + + private Future<RuleStats> submitToConcurrencyManager(DataService dataService) { + + if (dataService == null) throw new IllegalStateException(); final ProgramUtility util = new ProgramUtility(); @@ -374,40 +369,40 @@ } } - + if(log.isInfoEnabled()) { log.info("running w/ concurrency control: " + this); - + } /* * The index names must be gathered from each relation on which the task * will write so that they can be declared. - * + * * Note: We can't just pick and choose using the access paths since we * do not know how the propagation of bindings will effect access path * selection so we need a lock on all of the indices before the task can * run (at least, before it can run if it is a writer - no locks are * required for query). - * + * * 1. Find the distinct relations that are used by the rules. - * + * * 2. Collect the names of the indices maintained by those relations. - * + * * 3. Declare the indices since the task will need an exclusive lock on * them (mutation) or at least the ability to read from those indices * (query). - * + * * Note: if an index is not found on the live journal then it will be * resolved against the federation (if running in a federation). This * means that the task will run with the live index objects when they * are local and with IClientIndex objects when the index is remote. - * + * * Note: In general, mixtures of live and remote index objects do not * occur since indices are either partitioned (a federation) or * monolithic (a Journal). - * + * * Note: You CAN place indices onto specific data services running on a * set of machines and set [enableOverflow := false] such that the * indices never become partitioned. In that case you can have optimized @@ -421,34 +416,34 @@ // final long timestamp; // { -// +// // // flyweight instance. // IJoinNexus joinNexus = joinNexusFactory.newInstance(indexManager); -// +// // // choose timestamp based on more recent view required. // timestamp = action.isMutation() ? joinNexus.getWriteTimestamp() // : joinNexus.getReadTimestamp(); -// +// // } // // if(log.isInfoEnabled()) { -// +// // log.info("timestamp="+timestamp+", task="+this); -// +// // } /* * The set of indices that we need to declare for the task. */ final Set<String> indexNames = new HashSet<String>(); - + if(action.isMutation()) { - + /* * Obtain the name of each index for which we want write access. * These are the indices for the relations named in the head of each * rule. - * + * * Note: We are not actually issuing any tasks here, just * materializing relation views so that we can obtain the names of * the indices required for those views in order to declare them to @@ -459,11 +454,11 @@ */ final Map<String, IRelation> tmpRelations = getWriteRelations( indexManager, step, ITx.UNISOLATED); - + // Collect names of the required indices. final Set<String> writeIndexNames = getIndexNames(tmpRelations .values()); - + indexNames.addAll(writeIndexNames); } @@ -474,7 +469,7 @@ * Obtain the name of each index for which we want read access. * These are the indices for the relation view(s) named in the tails * of each rule. - * + * * Note: We are not actually issuing any tasks here, just * materializing relation views so that we can obtain the names of * the indices required for those views. UNISOLATED is always safe @@ -483,18 +478,18 @@ */ final Map<String, IRelation> tmpRelations = getReadRelations( indexManager, step, ITx.UNISOLATED); - + // Collect names of the required indices. final Set<String> readIndexNames = getIndexNames(tmpRelations .values()); - + indexNames.addAll(readIndexNames); - + } - + final String[] resource; { - + // The set of indices that the task will declare. resource = indexNames.toArray(new String[] {}); @@ -511,18 +506,18 @@ * choice is whether or not the task is UNISOLATED (an unisolated task * will obtain exclusive locks on the live indices declared by the * task). - * + * * A mutation task runs with the writeTimestamp. - * + * * A query task runs with the readTimestamp. - * + * * @todo handle transactions in this context. */ final long timestamp; { - + // final IJoinNexus joinNexus = joinNexusFactory.newInstance(indexManager); - + if (action.isMutation()) { timestamp = joinNexusFactory.getWriteTimestamp(); @@ -531,7 +526,7 @@ timestamp = joinNexusFactory.getReadTimestamp(); // timestamp = ITx.READ_COMMITTED; - + } if (log.isInfoEnabled()) { @@ -539,21 +534,21 @@ log.info("timestamp=" + timestamp + ", task=" + this); } - + } - + /* * Create the inner task. A clone is used to prevent possible side * effects on the original task. - * + * * Note: The [timestamp] was choosen above. The writeTimestamp iff this * is a mutation operation and the [readTimestamp] otherwise. */ final AbstractStepTask innerTask = this.clone(); - final IConcurrencyManager concurrencyManager = getDataService() - .getConcurrencyManager(); - + final IConcurrencyManager concurrencyManager = + dataService.getConcurrencyManager(); + final AbstractTask task = new AbstractTask(concurrencyManager, timestamp, resource) { @@ -570,7 +565,7 @@ * them and are running an UNISOLATED AbstractTask). */ innerTask.indexManager = getJournal(); - + return innerTask.call(); } @@ -580,9 +575,9 @@ if(log.isInfoEnabled()) { log.info("running on concurrencyManager: " + this); - + } - + /* * Run on the concurrency manager. */ @@ -618,20 +613,20 @@ protected Set<String> getWriteRelationNames(IStep step) { final Set<String> c = new HashSet<String>(); - + getWriteRelationNames(step, c); if(log.isDebugEnabled()) { - + log.debug("Found " + c.size() + " relations, program=" + step.getName()); - + } return c; - + } - + private void getWriteRelationNames(IStep p, Set<String> c) { if (p.isRule()) { @@ -641,11 +636,11 @@ if (r.getHead() == null) throw new IllegalArgumentException( "No head for this rule: rule=" + p); - + c.add(r.getHead().getOnlyRelationName()); } else { - + final Iterator<IStep> itr = ((IProgram)p).steps(); while (itr.hasNext()) { @@ -657,15 +652,15 @@ } } - + /** * Locate the distinct relation identifiers corresponding to the head of * each rule and resolve them to their relations. - * + * * @param timestamp * The timestamp associated with the relation views on which the * rule(s) will write. - * + * * @throws RuntimeException * if any relation can not be resolved. */ @@ -711,7 +706,7 @@ } } else { - + final Iterator<IStep> itr = ((IProgram)p).steps(); while (itr.hasNext()) { @@ -723,12 +718,12 @@ } } - + /** * Locate the distinct relation identifiers corresponding to the tail(s) of * each rule and resolve them to their relations. Note that a tail predicate * can read on a fused view of more than one relation. - * + * * @throws RuntimeException * if any relation can not be resolved. */ @@ -784,11 +779,11 @@ } } - + } } else { - + final Iterator<IStep> itr = ((IProgram)p).steps(); while (itr.hasNext()) { @@ -800,13 +795,13 @@ } } - + /** * Create the appropriate buffers to absorb writes by the rules in the * program that target an {@link IMutableRelation}. - * + * * @return the map from relation identifier to the corresponding buffer. - * + * * @throws IllegalStateException * if the program is being executed as mutation. * @throws RuntimeException @@ -820,13 +815,13 @@ if (!action.isMutation()) { throw new IllegalStateException(); - + } if(log.isDebugEnabled()) { - + log.debug(""); - + } final Map<String, IBuffer<ISolution[]>> c = new HashMap<String, IBuffer<ISolution[]>>( @@ -846,45 +841,45 @@ final IBuffer<ISolution[]> buffer; switch (action) { - + case Insert: - + buffer = joinNexus.newInsertBuffer((IMutableRelation)relation); - + break; - + case Delete: - + buffer = joinNexus.newDeleteBuffer((IMutableRelation)relation); - + break; - + default: - + throw new AssertionError("action=" + action); - + } c.put(relationIdentifier, buffer); - + } if(log.isDebugEnabled()) { - + log.debug("Created "+c.size()+" mutation buffers: action="+action); - + } return c; - + } - + /** * Returns the names of the indices maintained by the relations. - * + * * @param c * A collection of {@link IRelation}s. - * + * * @return The names of the indices maintained by those relations. */ @SuppressWarnings("unchecked") @@ -897,19 +892,19 @@ return Collections.EMPTY_SET; final Set<String> set = new HashSet<String>(); - + final Iterator<IRelation> itr = c.iterator(); - + while(itr.hasNext()) { - + final IRelation relation = itr.next(); - + set.addAll(relation.getIndexNames()); - + } return set; - + } } Deleted: branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/EmptyProgramTask.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/EmptyProgramTask.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/EmptyProgramTask.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -1,64 +0,0 @@ -package com.bigdata.relation.rule.eval; - -import com.bigdata.relation.rule.IProgram; -import com.bigdata.relation.rule.IStep; -import com.bigdata.striterator.EmptyChunkedIterator; - - -/** - * Provides execution for an "empty" program. - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public class EmptyProgramTask implements IProgramTask { - - final ActionEnum action; - - final IStep program; - - /** - * - * @param action - * @param step - * - * @throws IllegalArgumentException - * if any argument is <code>null</code>. - * @throws IllegalArgumentException - * unless the <i>step</i> is an empty {@link IProgram}. - */ - public EmptyProgramTask(ActionEnum action, IStep step) { - - if (action == null) - throw new IllegalArgumentException(); - - if (step == null) - throw new IllegalArgumentException(); - - if (step.isRule() || ((IProgram)step).stepCount() != 0) { - - throw new IllegalArgumentException(); - - } - - this.action = action; - - this.program = step; - - } - - public Object call() { - - if (action.isMutation()) { - - return Long.valueOf(0L); - - } else { - - return new EmptyChunkedIterator<ISolution>(null/* keyOrder */); - - } - - } - -} \ No newline at end of file Deleted: branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/IProgramTask.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/IProgramTask.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/IProgramTask.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -1,54 +0,0 @@ -/* - -Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. - -Contact: - SYSTAP, LLC - 4501 Tower Road - Greensboro, NC 27410 - lic...@bi... - -This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; version 2 of the License. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -*/ -/* - * Created on Jun 27, 2008 - */ - -package com.bigdata.relation.rule.eval; - -import java.util.concurrent.Callable; - -import com.bigdata.striterator.IChunkedOrderedIterator; - -/** - * Interface for a task that executes a (complex) program (vs a single rule). - * - * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ - */ -public interface IProgramTask extends Callable<Object> { - - /** - * The return will be either an {@link IChunkedOrderedIterator} (for - * {@link ActionEnum#Query}) or a {@link Long} element mutation count (for - * {@link ActionEnum#Insert} or {@link ActionEnum#Delete}). - * - * @return - * - * @throws Exception - */ - public Object call() throws Exception; - -} Modified: branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/ProgramTask.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 14:39:13 UTC (rev 3480) +++ branches/maven_scaleout/src/main/java/com/bigdata/relation/rule/eval/ProgramTask.java 2010-09-01 16:24:23 UTC (rev 3481) @@ -30,7 +30,6 @@ import java.io.IOException; import java.util.Iterator; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -51,16 +50,16 @@ import com.bigdata.relation.rule.IRule; import com.bigdata.relation.rule.IStep; import com.bigdata.service.DataService; -import com.bigdata.service.DataServiceCallable; import com.bigdata.service.IBigdataFederation; import com.bigdata.service.IDataService; +import com.bigdata.service.IDataServiceCallable; import com.bigdata.striterator.IChunkedOrderedIterator; import com.bigdata.striterator.ICloseableIterator; /** * Task for executing a program when all of the indices for the relation are * co-located on the same {@link DataService}. - * + * * @todo Named result sets. This would provide a means to run a IRuleTask and * cache the output for further evaluation... [truncated message content] |
From: <fko...@us...> - 2010-09-01 21:16:59
|
Revision: 3492 http://bigdata.svn.sourceforge.net/bigdata/?rev=3492&view=rev Author: fkoliver Date: 2010-09-01 21:16:52 +0000 (Wed, 01 Sep 2010) Log Message: ----------- Add serviceJoin(Service...) methods to match existing serviceJoin(IService...) methods for smart proxies. Modified Paths: -------------- branches/maven_scaleout/src/main/java/com/bigdata/service/AbstractFederation.java branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultClientDelegate.java branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultServiceFederationDelegate.java branches/maven_scaleout/src/main/java/com/bigdata/service/IFederationDelegate.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/JiniFederation.java branches/maven_scaleout/src/main/java/com/bigdata/service/jini/util/ListServices.java branches/maven_scaleout/src/test/java/com/bigdata/resources/AbstractResourceManagerTestCase.java branches/maven_scaleout/src/test/java/com/bigdata/service/TestEventReceiver.java Modified: branches/maven_scaleout/src/main/java/com/bigdata/service/AbstractFederation.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/service/AbstractFederation.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/main/java/com/bigdata/service/AbstractFederation.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -78,7 +78,6 @@ * Abstract base class for {@link IBigdataFederation} implementations. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * @param <T> * The generic type of the client or service. * @@ -947,8 +946,25 @@ /** * Delegated. {@inheritDoc} */ - public void serviceLeave(final UUID serviceUUID) { + public void serviceJoin(final Service service, final UUID serviceUUID) { + if (!isOpen()) return; + + if (log.isInfoEnabled()) { + + log.info("service=" + service + ", serviceUUID" + serviceUUID); + + } + + client.getDelegate().serviceJoin(service, serviceUUID); + + } + + /** + * Delegated. {@inheritDoc} + */ + public void serviceLeave(final UUID serviceUUID) { + if(!isOpen()) return; if(log.isInfoEnabled()) { @@ -1007,7 +1023,6 @@ * before the service can be started. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ protected class StartDeferredTasksTask implements Runnable { @@ -1303,7 +1318,6 @@ * load balancer service. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public static class ReportTask implements Runnable { @@ -1504,7 +1518,6 @@ * Sends events to the load balancer service. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * * FIXME should discard events if too many build up on the client. */ Modified: branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultClientDelegate.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultClientDelegate.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultClientDelegate.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -19,7 +19,6 @@ * {@link AbstractClient#setDelegate(IFederationDelegate)}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class DefaultClientDelegate<T> implements IFederationDelegate<T> { @@ -118,6 +117,11 @@ } /** NOP */ + public void serviceJoin(Service service, UUID serviceUUID) { + + } + + /** NOP */ public void serviceLeave(UUID serviceUUID) { } Modified: branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultServiceFederationDelegate.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultServiceFederationDelegate.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/main/java/com/bigdata/service/DefaultServiceFederationDelegate.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -53,7 +53,6 @@ * service interface reported to the load balancer service. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class DefaultServiceFederationDelegate<T extends AbstractService> implements IFederationDelegate<T> { @@ -145,6 +144,11 @@ } /** NOP */ + public void serviceJoin(Service service, UUID serviceUUID) { + + } + + /** NOP */ public void serviceLeave(UUID serviceUUID) { } Modified: branches/maven_scaleout/src/main/java/com/bigdata/service/IFederationDelegate.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/service/IFederationDelegate.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/main/java/com/bigdata/service/IFederationDelegate.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -39,7 +39,6 @@ * by the {@link AbstractFederation}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ * @param <T> * The generic type of the client or service. */ @@ -100,15 +99,27 @@ * Notice that the service has been discovered. This notice will be * generated the first time the service is discovered by a given * {@link IBigdataClient}. - * + * * @param service * The service. * @param serviceUUID * The service {@link UUID}. */ public void serviceJoin(IService service, UUID serviceUUID); - + /** + * Notice that the service has been discovered. This notice will be + * generated the first time the service is discovered by a given + * {@link IBigdataClient}. + * + * @param service + * The service. + * @param serviceUUID + * The service {@link UUID}. + */ + public void serviceJoin(Service service, UUID serviceUUID); + + /** * Notice that the service is no longer available. This notice will be * generated once for a given {@link IBigdataClient} when the service is no * longer available from any of its service registrars. Modified: branches/maven_scaleout/src/main/java/com/bigdata/service/jini/JiniFederation.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/service/jini/JiniFederation.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/main/java/com/bigdata/service/jini/JiniFederation.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -109,12 +109,12 @@ //BTM import com.bigdata.service.LoadBalancer; +import com.bigdata.service.Service; /** * Concrete implementation for Jini. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class JiniFederation<T> extends AbstractDistributedFederation<T> implements DiscoveryListener, ServiceDiscoveryListener { @@ -1260,17 +1260,22 @@ if (serviceItem.service instanceof IService) { -// System.err.println("serviceAdded: "+serviceItem); - final UUID serviceUUID = JiniUtil .serviceID2UUID(serviceItem.serviceID); serviceJoin((IService) serviceItem.service, serviceUUID); + } else if (serviceItem.service instanceof Service) { + + final UUID serviceUUID = JiniUtil + .serviceID2UUID(serviceItem.serviceID); + + serviceJoin((Service) serviceItem.service, serviceUUID); + } else { - log.warn("Not an " + IService.class); - + log.warn("Not an " + IService.class + " or an " + Service.class); + } } @@ -1410,7 +1415,6 @@ * Glue object. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ static private class TaskFuture<T> { @@ -1436,7 +1440,6 @@ * Run as a scheduled task that monitors futures. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ private static class MonitorFuturesTask implements Runnable { Modified: branches/maven_scaleout/src/main/java/com/bigdata/service/jini/util/ListServices.java =================================================================== --- branches/maven_scaleout/src/main/java/com/bigdata/service/jini/util/ListServices.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/main/java/com/bigdata/service/jini/util/ListServices.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -59,7 +59,6 @@ * Utility will list the discovered services in federation to which it connects. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class ListServices { @@ -155,7 +154,6 @@ * * @author <a href="mailto:tho...@us...">Bryan * Thompson</a> - * @version $Id$ */ static class DiscoverAndListTask implements Callable<String> { @@ -329,7 +327,12 @@ + "running.\n"); sb.append("Discovered " + registrars.length - + " jini service registrars.\n"); + + " jini service registrars. [ "); + for (ServiceRegistrar registrar : registrars) { + sb.append(registrar.getLocator().toString()); + sb.append(' '); + } + sb.append("]\n"); sb.append("Discovered " + a.length + " services\n"); Modified: branches/maven_scaleout/src/test/java/com/bigdata/resources/AbstractResourceManagerTestCase.java =================================================================== --- branches/maven_scaleout/src/test/java/com/bigdata/resources/AbstractResourceManagerTestCase.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/test/java/com/bigdata/resources/AbstractResourceManagerTestCase.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -79,13 +79,13 @@ import com.bigdata.util.httpd.AbstractHTTPD; import com.bigdata.service.LoadBalancer; +import com.bigdata.service.Service; /** * Base class for {@link ResourceManager} test suites that can use normal * startup and shutdown. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class AbstractResourceManagerTestCase extends AbstractResourceManagerBootstrapTestCase { @@ -238,7 +238,6 @@ * trying to test. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ protected static class MockMetadataService implements IMetadataService { @@ -410,7 +409,6 @@ * {@link ResourceManager} during the unit tests. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ protected class MockFederation implements IBigdataFederation<MockMetadataService> { @@ -611,6 +609,9 @@ public void serviceJoin(IService service, UUID serviceUUID) { } + public void serviceJoin(Service service, UUID serviceUUID) { + } + public void serviceLeave(UUID serviceUUID) { } Modified: branches/maven_scaleout/src/test/java/com/bigdata/service/TestEventReceiver.java =================================================================== --- branches/maven_scaleout/src/test/java/com/bigdata/service/TestEventReceiver.java 2010-09-01 20:02:58 UTC (rev 3491) +++ branches/maven_scaleout/src/test/java/com/bigdata/service/TestEventReceiver.java 2010-09-01 21:16:52 UTC (rev 3492) @@ -63,7 +63,6 @@ * Unit tests for the {@link EventReceiver}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ public class TestEventReceiver extends TestCase2 { @@ -85,7 +84,6 @@ * {@link EventReceiver} on the {@link MockFederation}. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ static class MyEvent extends Event { @@ -418,7 +416,6 @@ * Generates events. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ private static class EventFactory implements Callable<Void> { @@ -484,7 +481,6 @@ * the events are stored). * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ static private class EventConsumer implements Callable<Void> { @@ -525,7 +521,6 @@ * Mock federation to support the unit tests in the outer class. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ static class MockFederation implements IBigdataFederation<IEventReceivingService> { @@ -706,9 +701,13 @@ } public void serviceJoin(IService service, UUID serviceUUID) { - + } + public void serviceJoin(Service service, UUID serviceUUID) { + + } + public void serviceLeave(UUID serviceUUID) { } This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |