From: <tho...@us...> - 2012-08-20 14:32:08
|
Revision: 6457 http://bigdata.svn.sourceforge.net/bigdata/?rev=6457&view=rev Author: thompsonbry Date: 2012-08-20 14:31:58 +0000 (Mon, 20 Aug 2012) Log Message: ----------- Modified the object manager implementations to close the query connection (when one exists) and to ensure that the query result is closed. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/striterator/CloseableIteratorWrapper.java branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/NanoSparqlObjectManager.java branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/ObjectManager.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/striterator/CloseableIteratorWrapper.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/striterator/CloseableIteratorWrapper.java 2012-08-20 14:03:40 UTC (rev 6456) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/striterator/CloseableIteratorWrapper.java 2012-08-20 14:31:58 UTC (rev 6457) @@ -48,9 +48,16 @@ this.src = src; } - - /** NOP. */ + + /** Delegate to the source iff the source implements {@link ICloseable}. */ public void close() { + + if (src instanceof ICloseable) { + + ((ICloseable) src).close(); + + } + } public boolean hasNext() { Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/NanoSparqlObjectManager.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/NanoSparqlObjectManager.java 2012-08-20 14:03:40 UTC (rev 6456) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/NanoSparqlObjectManager.java 2012-08-20 14:31:58 UTC (rev 6457) @@ -36,12 +36,16 @@ import org.openrdf.query.GraphQueryResult; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; import org.openrdf.query.TupleQueryResult; import org.openrdf.repository.RepositoryException; import com.bigdata.gom.gpo.GPO; import com.bigdata.gom.gpo.IGPO; import com.bigdata.rdf.model.BigdataValueFactoryImpl; +import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; +import com.bigdata.rdf.sail.Sesame2BigdataIterator; import com.bigdata.rdf.sail.webapp.client.IPreparedGraphQuery; import com.bigdata.rdf.sail.webapp.client.IPreparedTupleQuery; import com.bigdata.rdf.sail.webapp.client.RemoteRepository; @@ -79,48 +83,27 @@ // // m_repo.close(); // } - @Override - public ICloseableIterator<BindingSet> evaluate(final String query) { - try { - final IPreparedTupleQuery q = m_repo.prepareTupleQuery(query); - final TupleQueryResult res = q.evaluate(); - return new CloseableIteratorWrapper<BindingSet>(new Iterator<BindingSet>() { + @Override + public ICloseableIterator<BindingSet> evaluate(final String query) { - @Override - public boolean hasNext() { - try { - return res.hasNext(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + try { - @Override - public BindingSet next() { - try { - return res.next(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + // Setup the query. + final IPreparedTupleQuery q = m_repo.prepareTupleQuery(query); - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }); - } catch (RepositoryException e1) { - e1.printStackTrace(); - } catch (MalformedQueryException e1) { - e1.printStackTrace(); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); - } - - return null; + // Note: evaluate() runs asynchronously and must be closed(). + final TupleQueryResult res = q.evaluate(); + + // Will close the TupleQueryResult. + return new Sesame2BigdataIterator<BindingSet, QueryEvaluationException>( + res); + + } catch (Exception ex) { + + throw new RuntimeException(ex); + + } + } @Override @@ -163,47 +146,26 @@ } @Override - public ICloseableIterator<Statement> evaluateGraph(final String query) { - try { - final IPreparedGraphQuery q = m_repo.prepareGraphQuery(query); - final GraphQueryResult res = q.evaluate(); - return new CloseableIteratorWrapper<Statement>(new Iterator<Statement>() { + public ICloseableIterator<Statement> evaluateGraph(final String query) { - @Override - public boolean hasNext() { - try { - return res.hasNext(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + try { - @Override - public Statement next() { - try { - return res.next(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + // Setup the query. + final IPreparedGraphQuery q = m_repo.prepareGraphQuery(query); - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }); - } catch (RepositoryException e1) { - e1.printStackTrace(); - } catch (MalformedQueryException e1) { - e1.printStackTrace(); - } catch (QueryEvaluationException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); - } - - return null; + // Note: evaluate() runs asynchronously and must be closed(). + final GraphQueryResult res = q.evaluate(); + + // Will close the GraphQueryResult. + return new Sesame2BigdataIterator<Statement, QueryEvaluationException>( + res); + + } catch (Exception ex) { + + throw new RuntimeException(ex); + + } + } @Override Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/ObjectManager.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/ObjectManager.java 2012-08-20 14:03:40 UTC (rev 6456) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-gom/src/java/com/bigdata/gom/om/ObjectManager.java 2012-08-20 14:31:58 UTC (rev 6457) @@ -55,6 +55,7 @@ import com.bigdata.rdf.model.BigdataValueFactory; import com.bigdata.rdf.sail.BigdataSailRepository; import com.bigdata.rdf.sail.BigdataSailRepositoryConnection; +import com.bigdata.rdf.sail.Sesame2BigdataIterator; import com.bigdata.rdf.sparql.ast.cache.CacheConnectionFactory; import com.bigdata.rdf.sparql.ast.cache.ICacheConnection; import com.bigdata.rdf.sparql.ast.cache.IDescribeCache; @@ -75,7 +76,7 @@ final private BigdataSailRepository m_repo; final private boolean readOnly; - private IDescribeCache m_describeCache; + final private IDescribeCache m_describeCache; /** * @@ -94,21 +95,34 @@ final AbstractTripleStore tripleStore = cxn.getDatabase(); this.readOnly = tripleStore.isReadOnly(); - - final QueryEngine queryEngine = QueryEngineFactory.getStandaloneQueryController((Journal) m_repo.getDatabase().getIndexManager()); - final ICacheConnection cacheConn = CacheConnectionFactory - .getExistingCacheConnection(queryEngine); + /* + * FIXME The DESCRIBE cache feature is not yet finished. This code will + * not obtain a connection to the DESCRIBE cache unless an unisolated + * query or update operation has already run against the query engine. + * This is a known bug and will be resolved as we work through the MVCC + * cache coherence for the DESCRIBE cache. + */ + { - if (cacheConn != null) { + final QueryEngine queryEngine = QueryEngineFactory + .getStandaloneQueryController((Journal) m_repo + .getDatabase().getIndexManager()); - m_describeCache = cacheConn.getDescribeCache( - tripleStore.getNamespace(), 0 /*tripleStore.getTimestamp()*/); + final ICacheConnection cacheConn = CacheConnectionFactory + .getExistingCacheConnection(queryEngine); - } else { + if (cacheConn != null) { - m_describeCache = null; + m_describeCache = cacheConn.getDescribeCache( + tripleStore.getNamespace(), tripleStore.getTimestamp()); + } else { + + m_describeCache = null; + + } + } } @@ -133,119 +147,100 @@ } @Override - public ICloseableIterator<BindingSet> evaluate(final String query) { + public ICloseableIterator<BindingSet> evaluate(final String query) { - BigdataSailRepositoryConnection cxn = null; - - try { + final BigdataSailRepositoryConnection cxn; + try { + cxn = getQueryConnection(); + } catch (RepositoryException e1) { + throw new RuntimeException(e1); + } - cxn = getQueryConnection(); - - final TupleQuery q = cxn.prepareTupleQuery(QueryLanguage.SPARQL, + try { + + // Setup the query. + final TupleQuery q = cxn.prepareTupleQuery(QueryLanguage.SPARQL, query); - - final TupleQueryResult res = q.evaluate(); - - return new CloseableIteratorWrapper<BindingSet>( - new Iterator<BindingSet>() { - @Override - public boolean hasNext() { - try { - return res.hasNext(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + // Note: evaluate() runs asynchronously and must be closed(). + final TupleQueryResult res = q.evaluate(); - @Override - public BindingSet next() { - try { - return res.next(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + // Will close the TupleQueryResult. + return new Sesame2BigdataIterator<BindingSet, QueryEvaluationException>( + res) { + public void close() { + // Close the TupleQueryResult. + super.close(); + try { + // Close the connection. + cxn.close(); + } catch (RepositoryException e) { + throw new RuntimeException(e); + } + } + }; - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }); + } catch (Throwable t) { - } catch (Exception ex) { - - throw new RuntimeException(ex); - - } finally { - - if (cxn != null) { - try { - cxn.close(); - } catch (RepositoryException e) { - log.error(e, e); - } + // Error preparing the query. + try { + // Close the connection + cxn.close(); + } catch (RepositoryException e) { + log.error(e, e); } - + + throw new RuntimeException(t); + } - + } public ICloseableIterator<Statement> evaluateGraph(final String query) { - BigdataSailRepositoryConnection cxn = null; - - try { - - cxn = getQueryConnection(); - - final GraphQuery q = cxn.prepareGraphQuery(QueryLanguage.SPARQL, - query); - - final GraphQueryResult res = q.evaluate(); - - return new CloseableIteratorWrapper<Statement>(new Iterator<Statement>() { + final BigdataSailRepositoryConnection cxn; + try { + cxn = getQueryConnection(); + } catch (RepositoryException e1) { + throw new RuntimeException(e1); + } - @Override - public boolean hasNext() { - try { - return res.hasNext(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + try { - @Override - public Statement next() { - try { - return res.next(); - } catch (QueryEvaluationException e) { - throw new RuntimeException(e); - } - } + // Setup the query. + final GraphQuery q = cxn.prepareGraphQuery(QueryLanguage.SPARQL, + query); - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }); + // Note: evaluate() runs asynchronously and must be closed(). + final GraphQueryResult res = q.evaluate(); - } catch (Exception t) { - - throw new RuntimeException(t); - - } finally { - - if (cxn != null) { - try { - cxn.close(); - } catch (RepositoryException e) { - log.error(e, e); + // Will close the TupleQueryResult. + return new Sesame2BigdataIterator<Statement, QueryEvaluationException>( + res) { + public void close() { + // Close the TupleQueryResult. + super.close(); + try { + // Close the connection. + cxn.close(); + } catch (RepositoryException e) { + throw new RuntimeException(e); + } } + }; + + } catch (Throwable t) { + + // Error preparing the query. + try { + // Close the connection + cxn.close(); + } catch (RepositoryException e) { + log.error(e, e); } - + + throw new RuntimeException(t); + } } @@ -271,21 +266,21 @@ ((GPO) gpo).dematerialize(); - if (m_describeCache == null) { - AbstractTripleStore store = m_repo.getDatabase(); - final QueryEngine queryEngine = QueryEngineFactory.getStandaloneQueryController((Journal) store.getIndexManager()); - - final ICacheConnection cacheConn = CacheConnectionFactory - .getExistingCacheConnection(queryEngine); - - if (cacheConn != null) { - - m_describeCache = cacheConn.getDescribeCache( - store.getNamespace(), store.getTimestamp()); - - } - - } +// if (m_describeCache == null) { +// AbstractTripleStore store = m_repo.getDatabase(); +// final QueryEngine queryEngine = QueryEngineFactory.getStandaloneQueryController((Journal) store.getIndexManager()); +// +// final ICacheConnection cacheConn = CacheConnectionFactory +// .getExistingCacheConnection(queryEngine); +// +// if (cacheConn != null) { +// +// m_describeCache = cacheConn.getDescribeCache( +// store.getNamespace(), store.getTimestamp()); +// +// } +// +// } /* * At present the DESCRIBE query will simply return a set of statements This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |