From: <tho...@us...> - 2012-11-29 16:49:43
|
Revision: 6741 http://bigdata.svn.sourceforge.net/bigdata/?rev=6741&view=rev Author: thompsonbry Date: 2012-11-29 16:49:35 +0000 (Thu, 29 Nov 2012) Log Message: ----------- Added optimization by Martyn for B+Tree byte[1] for the RDF Statement values to both SPOTupleSerializer and FastRDFValueCoder2. Modified the WriteCacheService to always set the fileExtent immediately before writing out the cache block to the wire or local disk. This deals with a problem where the compacting cache could have a stale fileOffset. Modified Paths: -------------- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/FastRDFValueCoder2.java branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOTupleSerializer.java Added Paths: ----------- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/RDFValueFactory.java Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2012-11-29 01:27:11 UTC (rev 6740) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCache.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -324,7 +324,7 @@ return getClass().getSimpleName() + "{fileOffset=" + fileOffset + ",bufferOffset=" + bufferOffset + ",len=" + recordLength - + ", delete=" + deleted + "}"; + + ",delete=" + deleted + "}"; } @@ -470,7 +470,7 @@ * When <code>null</code> a buffer will be allocated for you from * the {@link DirectBufferPool}. Buffers allocated on your behalf * will be automatically released by {@link #close()}. - * @param scatteredWrites + * @param prefixWrites * <code>true</code> iff the implementation uses scattered * writes. The RW store uses scattered writes since its updates * are written to different parts of the backing file. The WORM @@ -496,9 +496,10 @@ * * @throws InterruptedException */ - public WriteCache(IBufferAccess buf, final boolean scatteredWrites, final boolean useChecksum, - final boolean isHighlyAvailable, final boolean bufferHasData, - final long fileExtent) throws InterruptedException { + public WriteCache(IBufferAccess buf, final boolean prefixWrites, + final boolean useChecksum, final boolean isHighlyAvailable, + final boolean bufferHasData, final long fileExtent) + throws InterruptedException { if (bufferHasData && buf == null) throw new IllegalArgumentException(); @@ -521,7 +522,7 @@ // this.quorumManager = quorumManager; this.useChecksum = useChecksum; - this.prefixWrites = scatteredWrites; + this.prefixWrites = prefixWrites; if (isHighlyAvailable && !bufferHasData) { // Note: No checker if buffer has data. @@ -570,7 +571,7 @@ * better with concurrency, so we should benchmark this option for * non-scattered writes as well. */ - if (scatteredWrites) { + if (prefixWrites) { recordMap = new ConcurrentSkipListMap<Long, RecordMetadata>(); } else { recordMap = new ConcurrentHashMap<Long, RecordMetadata>(indexDefaultCapacity); @@ -609,6 +610,8 @@ + "{recordCount=" + recordMap.size()// + ",firstOffset=" + firstOffset// + ",releaseBuffer=" + releaseBuffer// + + ",prefixWrites=" + prefixWrites// + + ",useChecksum=" + useChecksum// + ",bytesWritten=" + bytesWritten()// + ",bytesRemaining=" + remaining()// + ",bytesRemoved=" + m_removed// @@ -973,48 +976,48 @@ } - /** - * This method supports - * {@link #transferTo(WriteCache, WriteCache, ConcurrentMap)} and provides a - * low-level code path for copying records into <i>this</i> buffer from the - * buffer specified by the caller. - * <p> - * Note: This method is only invoked by transferTo(). We need to check its - * assumptions in more depth regarding synchronization before invoking from - * any other context. - */ - private boolean writeRaw(final long offset, final ByteBuffer bb, - final int latchedAddr) throws IllegalStateException, - InterruptedException { +// /** +// * This method supports +// * {@link #transferTo(WriteCache, WriteCache, ConcurrentMap)} and provides a +// * low-level code path for copying records into <i>this</i> buffer from the +// * buffer specified by the caller. +// * <p> +// * Note: This method is only invoked by transferTo(). We need to check its +// * assumptions in more depth regarding synchronization before invoking from +// * any other context. +// */ +// private boolean writeRaw(final long offset, final ByteBuffer bb, +// final int latchedAddr) throws IllegalStateException, +// InterruptedException { +// +// assert !m_closedForWrites; +// +// final int len = bb.limit() - bb.position(); +// +// assert len <= remaining(); +// +// final ByteBuffer tmp = acquire(); +// try { +// final int pos; +// final int prefix = (prefixWrites ? SIZEOF_PREFIX_WRITE_METADATA : 0); +// final int datalen = len - prefix; +// synchronized (tmp) { +// pos = tmp.position(); +// tmp.put(bb); +// } +// final RecordMetadata old = recordMap.put(Long.valueOf(offset), +// new RecordMetadata(offset, pos + prefix, datalen, +// latchedAddr)); +// if (old != null) { +// throw new IllegalStateException("Write already found at " +// + offset); +// } +// return true; +// } finally { +// release(); +// } +// } - assert !m_closedForWrites; - - final int len = bb.limit() - bb.position(); - - assert len <= remaining(); - - final ByteBuffer tmp = acquire(); - try { - final int pos; - final int prefix = (prefixWrites ? SIZEOF_PREFIX_WRITE_METADATA : 0); - final int datalen = len - prefix; - synchronized (tmp) { - pos = tmp.position(); - tmp.put(bb); - } - final RecordMetadata old = recordMap.put(Long.valueOf(offset), - new RecordMetadata(offset, pos + prefix, datalen, - latchedAddr)); - if (old != null) { - throw new IllegalStateException("Write already found at " - + offset); - } - return true; - } finally { - release(); - } - } - /** * {@inheritDoc} * @@ -1565,8 +1568,7 @@ * <p> * Note: <code>volatile</code> since not guarded by any lock. */ - // package private : exposed to canCompact() in subclass. - volatile int m_removed; + private volatile int m_removed; /** * Sets the performance counters to be used by the write cache. A service @@ -2048,12 +2050,13 @@ * @param serviceRecordMap * the map of the WriteCacheService that associates an address * with a WriteCache - * @param fileExtent - * the current extent of the backing file. * @throws InterruptedException */ - void resetWith(final ConcurrentMap<Long, WriteCache> serviceRecordMap, - final long fileExtent) throws InterruptedException { +// * @param fileExtent +// * the current extent of the backing file. + void resetWith(final ConcurrentMap<Long, WriteCache> serviceRecordMap +// final long fileExtentIsIgnored + ) throws InterruptedException { final Iterator<Long> entries = recordMap.keySet().iterator(); @@ -2062,7 +2065,8 @@ log.info("resetting existing WriteCache: nrecords=" + recordMap.size() + ", hashCode=" + hashCode()); while (entries.hasNext()) { - final Long addr = entries.next(); + + final Long fileOffset = entries.next(); /* * We need to guard against the possibility that the entry in @@ -2073,9 +2077,9 @@ * Using the conditional remove on ConcurrentMap guards against * this. */ - final boolean removed = serviceRecordMap.remove(addr, this); + final boolean removed = serviceRecordMap.remove(fileOffset, this); - registerWriteStatus(addr, 0, removed ? 'R' : 'L'); + registerWriteStatus(fileOffset, 0, removed ? 'R' : 'L'); } @@ -2090,7 +2094,7 @@ } reset(); // must ensure reset state even if cache already empty - setFileExtent(fileExtent); +// setFileExtent(fileExtent); } @@ -2309,54 +2313,80 @@ * guaranteeing that no writes will be applied to [src]). */ final ByteBuffer bb = src.acquire().duplicate(); + ByteBuffer dd = null; try { - - final int chklen = 0; // useChecksum ? 4 : 0; + // Setup destination + dd = dst.acquire(); + // Note: md.recordLength includes the checksum (suffix) final int prefixlen = src.prefixWrites ? SIZEOF_PREFIX_WRITE_METADATA : 0; - final int xtralen = chklen + prefixlen; final Set<Entry<Long, RecordMetadata>> es = src.recordMap.entrySet(); final Iterator<Entry<Long, RecordMetadata>> entries = es.iterator(); while (entries.hasNext()) { final Entry<Long, RecordMetadata> entry = entries.next(); - final long offset = entry.getKey(); // file offset. + final long fileOffset = entry.getKey(); // file offset. final RecordMetadata md = entry.getValue(); if (serviceRecordMap != null) { - final WriteCache tmp = serviceRecordMap.get(offset); + final WriteCache tmp = serviceRecordMap.get(fileOffset); if (tmp == null) throw new AssertionError("Not owned: offset=" - + offset + ", md=" + md); + + fileOffset + ", md=" + md); else if (tmp != src) throw new AssertionError( "Record not owned by this cache: src=" + src + ", owner=" + tmp - + ", offset=" + offset + ", md=" + + ", offset=" + fileOffset + ", md=" + md); } - - final int len = md.recordLength + xtralen; + assert !md.deleted; // not deleted (deleted entries should not be in the recordMap). + final int len = prefixlen + md.recordLength; final int dstremaining = dst.remaining(); if (len > dstremaining) { // Not enough room in destination for this record. if (dstremaining >= 512) { - // Destinaction still has room, keep looking. + // Destination still has room, keep looking. continue; } // Destination is full (or full enough). return false; } - final ByteBuffer dup = bb;//bb.duplicate(); (dup'd above). +// final ByteBuffer dup = bb;//bb.duplicate(); (dup'd above). final int pos = md.bufferOffset - prefixlen;// include prefix final int limit = pos + len; // and any postfix - dup.limit(limit); - dup.position(pos); - dst.writeRaw(offset, dup, md.latchedAddr); - - if (dst.remaining() != (dstremaining - len)) { - throw new AssertionError("dst.remaining(): " + dst.remaining() + " expected: " + dstremaining); + final int dstoff; // offset in the destination buffer. + synchronized (bb) { + bb.limit(limit); + bb.position(pos); + // dst.writeRaw(fileOffset, dup, md.latchedAddr); + + // Copy to destination. + synchronized (dd) { + dstoff = dd.position() + prefixlen; + dd.put(bb); + assert dst.remaining() == (dstremaining - len) : "dst.remaining(): " + + dst.remaining() + + " expected: " + + dstremaining; + } } - + /* + * Insert record into destination. + * + * Note: The [orderedList] on the target buffer is not + * updated because we handle the propagation of the address + * allocation/clear notices separately and synchronously + * using prepareAddressMetadataForHA(). + */ + { + final RecordMetadata old = dst.recordMap.put(Long + .valueOf(fileOffset), new RecordMetadata( + fileOffset, dstoff/* bufferOffset */, + md.recordLength, md.latchedAddr)); + + assert old == null : "Write already found: " + old; + } + if (serviceRecordMap != null) { /* * Note: As soon as we update the service record map it @@ -2364,11 +2394,10 @@ * clear the record from [dst]. We can not rely on the * record remaining in [dst] after this method call! */ - final WriteCache tmp = serviceRecordMap - .put(offset, dst); - if (tmp != src) - throw new AssertionError("tmp=" + tmp + ",src=" - + src + ", offset=" + offset + ", md=" + md); + final WriteCache tmp = serviceRecordMap.put(fileOffset, + dst); + assert src == tmp : "tmp=" + tmp + ",src=" + src + + ", offset=" + fileOffset + ", md=" + md; } // Clear entry from src recordMap. @@ -2391,6 +2420,8 @@ throw new IllegalStateException(); } } finally { + if (dd != null) + dst.release(); src.release(); } } @@ -2559,20 +2590,6 @@ } -// /** -// * Return <code>true</code> iff we are allowed to compact buffers. The -// * default implementation of the {@link WriteCache} is for a Worm and can -// * never compact. -// * <p> -// * Note: This method is package private for access by -// * {@link WriteCacheService}. -// */ -// boolean canCompact() { -// -// return false; -// -// } - /** * Return the percentage of space that has been removed through the * application of {@link #clearAddrMap(long, int)} and hence could be Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2012-11-29 01:27:11 UTC (rev 6740) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata/src/java/com/bigdata/io/writecache/WriteCacheService.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -952,7 +952,7 @@ if (log.isTraceEnabled()) log.trace("Setting curCompactingCache to reserve"); - reserve.resetWith(recordMap, fileExtent.get()); + reserve.resetWith(recordMap);//, fileExtent.get()); curCompactingCache = reserve; if (log.isTraceEnabled()) log.trace("Transferring to curCompactingCache"); @@ -1142,15 +1142,25 @@ */ cache.closeForWrites(); + /* + * Test for an empty cache. + * + * Note: We can not do this until the cache has been closed for + * writes. + */ { final ByteBuffer b = cache.peek(); - if (b.position() == 0) + if (b.position() == 0) { + // Empty cache. return; + } } // increment writeCache sequence cache.setSequence(cacheSequence++); + cache.setFileExtent(fileExtent.get()); + if (quorum != null && quorum.isHighlyAvailable()) { // Verify quorum still valid and we are the leader. @@ -1832,7 +1842,13 @@ // m_dirtyListThreshold = saveDirtyListThreshold; flush = false; try { - assert compactingCache == null; + if(!halt) { + /* + * Can not check assertion if there is an existing + * exception. + */ + assert compactingCache == null; + } } finally { dirtyListLock.unlock(); } @@ -1858,7 +1874,7 @@ // Guaranteed available hence non-blocking. final WriteCache nxt = cleanList.take(); counters.get().nclean--; - nxt.resetWith(recordMap, fileExtent.get()); + nxt.resetWith(recordMap);//, fileExtent.get()); current.set(nxt); return true; } finally { @@ -1896,24 +1912,24 @@ if (fileExtent < 0L) throw new IllegalArgumentException(); - final WriteCache cache = acquireForWriter(); - - try { +// final WriteCache cache = acquireForWriter(); +// +// try { if (log.isDebugEnabled()) log.debug("Set fileExtent: " + fileExtent); // make a note of the current file extent. this.fileExtent.set(fileExtent); - // set the current file extent on the WriteCache. - cache.setFileExtent(fileExtent); +// // set the current file extent on the WriteCache. +// cache.setFileExtent(fileExtent); +// +// } finally { +// +// release(); +// +// } - } finally { - - release(); - - } - } public boolean write(final long offset, final ByteBuffer data, final int chk) @@ -2152,7 +2168,7 @@ counters.get().nclean--; // Clear the state on the new buffer and remove from // cacheService map - newBuffer.resetWith(recordMap, fileExtent.get()); + newBuffer.resetWith(recordMap);//, fileExtent.get()); // Set it as the new buffer. current.set(cache = newBuffer); @@ -2450,7 +2466,7 @@ counters.get().nclean--; // Clear state on new buffer and remove from cacheService map - newBuffer.resetWith(recordMap, fileExtent.get()); + newBuffer.resetWith(recordMap);//, fileExtent.get()); // Set it as the new buffer. current.set(newBuffer); Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-11-29 01:27:11 UTC (rev 6740) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/java/com/bigdata/journal/jini/ha/HAJournalServer.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -954,6 +954,9 @@ @Override public Void call() throws Exception { + if (true) + throw new UnsupportedOperationException(); + // final long readLock = leader.newTx(ITx.READ_COMMITTED); try { @@ -1765,6 +1768,11 @@ throw new RuntimeException(e); + } catch (RuntimeException t) { + + // Wrap with the HA message. + throw new RuntimeException("msg=" + msg + ": " + t, t); + } } @@ -1968,7 +1976,7 @@ } setExtent(msg); - writeWriteCacheBlock(msg,data); + writeWriteCacheBlock(msg, data); } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2012-11-29 01:27:11 UTC (rev 6740) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-jini/src/test/com/bigdata/journal/jini/ha/TestHA3JournalServer.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -380,9 +380,9 @@ */ for (RemoteRepository r : repos) { - // Should be empty. - assertEquals(10L, - countResults(r.prepareTupleQuery("SELECT * {?a ?b ?c} LIMIT 10") + // Should have data. + assertEquals(100L, + countResults(r.prepareTupleQuery("SELECT * {?a ?b ?c} LIMIT 100") .evaluate())); } @@ -401,7 +401,54 @@ // Verify no HALog files since fully met quorum @ commit. assertHALogNotFound(0L/* firstCommitCounter */, lastCommitCounter2, new HAGlue[] { serverA, serverB, serverC }); + + /* + * Do a "DROP ALL" and reverify that no solutions are found on each + * service. + */ + { + + // Verify quorum is still valid. + quorum.assertQuorum(token); + + repos[0].prepareUpdate("DROP ALL").evaluate(); + + } + /* + * Verify that query on all nodes is allowed and now provides an empty + * result. + */ + for (RemoteRepository r : repos) { + + // Should be empty. + assertEquals( + 0L, + countResults(r.prepareTupleQuery( + "SELECT * {?a ?b ?c} LIMIT 100").evaluate())); + + } + + // Current commit point. + final long lastCommitCounter3 = serverA + .getRootBlock(new HARootBlockRequest(null/* storeUUID */)) + .getRootBlock().getCommitCounter(); + + // There are now THREE (3) commit points. + assertEquals(3L, lastCommitCounter3); + + // Verify binary equality. + assertDigestsEquals(new HAGlue[] { serverA, serverB, serverC }); + + // Verify no HALog files since fully met quorum @ commit. + assertHALogNotFound(0L/* firstCommitCounter */, lastCommitCounter2, + new HAGlue[] { serverA, serverB, serverC }); + + /* + * TODO Continue test and verify restart? Or verify restart before we do + * the DROP ALL? + */ + } } Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/FastRDFValueCoder2.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/FastRDFValueCoder2.java 2012-11-29 01:27:11 UTC (rev 6740) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/FastRDFValueCoder2.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -174,7 +174,6 @@ * Decoder. * * @author <a href="mailto:tho...@us...">Bryan Thompson</a> - * @version $Id$ */ private static class CodedRabaImpl extends AbstractCodedRaba { @@ -331,7 +330,8 @@ } else { - return new byte[] { bits }; +// return new byte[] { bits }; + return RDFValueFactory.getValue(bits); } Added: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/RDFValueFactory.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/RDFValueFactory.java (rev 0) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/RDFValueFactory.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -0,0 +1,64 @@ +/* + +Copyright (C) SYSTAP, LLC 2006-2008. All rights reserved. + +Contact: + SYSTAP, LLC + 4501 Tower Road + Greensboro, NC 27410 + lic...@bi... + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +*/ +package com.bigdata.rdf.spo; + +/** + * Factory for the single element <code>byte[]</code> used for the value of an + * RDF Statement in one of the statement indices. + * + * @author <a href="mailto:tho...@us...">Bryan Thompson</a> + */ +public class RDFValueFactory { + + private final static byte[][] table = createStaticByteArrayTable(); + + private static byte[][] createStaticByteArrayTable() { + final byte[][] table = new byte[256][]; + + for (int i = 0; i < 256; i++) { + + table[i] = new byte[] { (byte) i }; + + } + + return table; + + } + + /** + * Return the B+Tree value for an RDF Statement given its byte value. + * + * @param i + * The byte value of the Statement. + * + * @return A byte[] whose sole element is that byte value. + */ + static public byte[] getValue(final byte i) { + + return table[i]; + + } + +} Modified: branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOTupleSerializer.java =================================================================== --- branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOTupleSerializer.java 2012-11-29 01:27:11 UTC (rev 6740) +++ branches/BIGDATA_RELEASE_1_2_0/bigdata-rdf/src/java/com/bigdata/rdf/spo/SPOTupleSerializer.java 2012-11-29 16:49:35 UTC (rev 6741) @@ -117,12 +117,12 @@ * @param keyOrder * The access path. * @param sids - * If true, attach sids to decoded SPOs where appropriate. + * If true, attach sids to decoded SPOs where appropriate. * @param leafKeySer * @param leafValSer */ public SPOTupleSerializer(final SPOKeyOrder keyOrder, - final boolean sids, + final boolean sids, final IRabaCoder leafKeySer, final IRabaCoder leafValSer) { super(new ASCIIKeyBuilderFactory(), leafKeySer, leafValSer); @@ -163,18 +163,18 @@ } - /** - * Variant duplicates the behavior of {@link #serializeVal(SPO)} to provide - * support for non-{@link SPO} {@link ISPO}s. - */ + /** + * Variant duplicates the behavior of {@link #serializeVal(SPO)} to provide + * support for non-{@link SPO} {@link ISPO}s. + */ public byte[] serializeVal(final ISPO spo) { - if (spo == null) + if (spo == null) throw new IllegalArgumentException(); - return serializeVal(//buf, - spo.isOverride(), spo.getUserFlag(), spo.getStatementType()); - + return serializeVal(//buf, + spo.isOverride(), spo.getUserFlag(), spo.getStatementType()); + } /** @@ -186,60 +186,62 @@ if (spo == null) throw new IllegalArgumentException(); - return serializeVal(//buf, - spo.isOverride(), spo.getUserFlag(), spo.getStatementType()); + return serializeVal(//buf, + spo.isOverride(), spo.getUserFlag(), spo.getStatementType()); - } + } - /** - * Return the byte[] that would be written into a statement index for this - * {@link SPO}, including the optional {@link StatementEnum#MASK_OVERRIDE} - * bit. If the statement identifier is non-null then it will be included in - * the returned byte[]. - * - * @param override - * <code>true</code> iff you want the - * {@link StatementEnum#MASK_OVERRIDE} bit set (this is only set - * when serializing values for a remote procedure that will write - * on the index, it is never set in the index itself). - * @param userFlag - * <code>true</code> iff you want the - * {@link StatementEnum#MASK_USER_FLAG} bit set. - * @param type - * The {@link StatementEnum}. - * - * @return The value that would be written into a statement index for this - * {@link SPO}. - */ + /** + * Return the byte[] that would be written into a statement index for this + * {@link SPO}, including the optional {@link StatementEnum#MASK_OVERRIDE} + * bit. If the statement identifier is non-null then it will be included in + * the returned byte[]. + * + * @param override + * <code>true</code> iff you want the + * {@link StatementEnum#MASK_OVERRIDE} bit set (this is only set + * when serializing values for a remote procedure that will write + * on the index, it is never set in the index itself). + * @param userFlag + * <code>true</code> iff you want the + * {@link StatementEnum#MASK_USER_FLAG} bit set. + * @param type + * The {@link StatementEnum}. + * + * @return The value that would be written into a statement index for this + * {@link SPO}. + */ // * @param buf // * A buffer supplied by the caller. The buffer will be reset // * before the value is written on the buffer. - public byte[] serializeVal(//final ByteArrayBuffer buf, - final boolean override, final boolean userFlag, - final StatementEnum type) { - -// buf.reset(); + public byte[] serializeVal(//final ByteArrayBuffer buf, + final boolean override, final boolean userFlag, + final StatementEnum type) { + +// buf.reset(); - // optionally set the override and user flag bits on the value. - final byte b = (byte) - (type.code() - | (override ? StatementEnum.MASK_OVERRIDE : 0x0) - | (userFlag ? StatementEnum.MASK_USER_FLAG : 0x0) - ); + // optionally set the override and user flag bits on the value. + final byte b = (byte) + (type.code() + | (override ? StatementEnum.MASK_OVERRIDE : 0x0) + | (userFlag ? StatementEnum.MASK_USER_FLAG : 0x0) + ); -// buf.putByte(b); +// buf.putByte(b); // -// final byte[] a = buf.toByteArray(); +// final byte[] a = buf.toByteArray(); // // assert a.length == 1 : "Expecting one byte, but have " // + BytesUtil.toString(a); - - return new byte[]{b}; + + return RDFValueFactory.getValue(b); - } + } + - public SPO deserialize(final ITuple tuple) { + public SPO deserialize(final ITuple tuple) { + if (tuple == null) throw new IllegalArgumentException(); @@ -271,12 +273,12 @@ } - /** - * Set the statement type, bit flags, and optional sid based on the tuple - * value. - */ + /** + * Set the statement type, bit flags, and optional sid based on the tuple + * value. + */ public ISPO decodeValue(final ISPO spo, final byte[] val) { - + final byte code = val[0]; final StatementEnum type = StatementEnum.decode(code); @@ -288,16 +290,16 @@ spo.setUserFlag(StatementEnum.isUserFlag(code)); if (sids) { - + // SIDs only valid for triples. assert keyOrder.getKeyArity() == 3; if (spo.isExplicit()) { - - spo.setStatementIdentifier(true); - + + spo.setStatementIdentifier(true); + } - + } return spo; @@ -329,10 +331,10 @@ switch (version) { case VERSION0: keyOrder = SPOKeyOrder.valueOf(in.readByte()); - /* - * New version is not backwards compatible with old journals that - * used sids. - */ + /* + * New version is not backwards compatible with old journals that + * used sids. + */ sids = false; break; case VERSION1: This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |