1. Summary
  2. Files
  3. Support
  4. Report Spam
  5. Create account
  6. Log in

Ticket #475 (closed enhancement: fixed)

Opened 16 months ago

Last modified 15 months ago

Optimize serialization for query messages on cluster

Reported by: thompsonbry Owned by: thompsonbry
Priority: major Milestone: Query
Component: Bigdata Federation Version: BIGDATA_RELEASE_1_1_0
Keywords: Cc:

Description (last modified by thompsonbry) (diff)

Quite a bit of the query overhead on the cluster is RMI serialization costs for query. Optimize the serialization of the StartOpMessage?, HaltOpMessage?, and IChunkMessage. For the messages which are not already pure interfaces (startOp(), haltOp()), turn the method signatures into interface only signatures so we can forward version the API.

There is also significant overhead associated with IQueryPeer#getServiceUUID(). That overhead comes from calling getServiceUUID() on the proxy object, which is turned in an RMI. This could be fixed by (a) using a smart proxy pattern; (b) sending only the UUID of the service rather than its proxy and resolving the proxy against the local cache of known services; or (c) casting to the appropriate interface so we can obtain the ServiceID from the proxy and then converting that directly into the UUID of the service.

I have broken the IChunkMessage#getQueryController() aspect out into its own issue [1]. A related issue is vectoring the messages per host in order to reduce the #of messages which are being sent [2].

[1] https://sourceforge.net/apps/trac/bigdata/ticket/487 (The query controller should be discoverable)

[2] https://sourceforge.net/apps/trac/bigdata/ticket/488 (Vector query engine messages)

Change History

Changed 16 months ago by thompsonbry

  • status changed from new to accepted

A variety of changes designed to improve RMI message serialization, reduce RMI calls (especially to obtain the UUID of the query controller), and weed out the IAsynchronousIterator API from the query engine (it is a holdover from the older distributed pipeline join architecture which was refactored to create the current QueryEngine? and the ChunkedRunningQuery?).

Passing all BOP and SPARQL test suites.

- IChunkMessage#getSolutionCount() : new method.

- IChunkMessage#getQueryControllerId() : new method to avoid RMIs.

- Clean out QueryEngine#eval?(). It is no longer possible to pass in an

IAsynchronousIterator. This is one step towards simplification of
the PipelineOp? and processing of chunks.

- LocalChunkMessage? : no longer owns an IAsynchronousIterator

object. There is now some possibility now that clients could double
consume the chunk, however this is the same situation that the other
IChunkMessage implementations already face. I have filled in the
semantics for IChunkMessage#close() parallel to the implementation
on the ThinkChunkMessage?.

- Use IChunkMessage#getQueryControllerId() rather than

getQueryController().getQueryControllerId() whenever possible. This
prevents RMIs in many use cases. However, we can not yet drop the
getQueryControllerId() method entirely.

The IQueryController proxy is still being serialized with the
IChunkMessage. This is because we can not discover the query
controller using river because it is not registered as a service. We
can find the query peers on the data service nodes easily enough
because they are all registered with river. However, the QueryEngine?
serving as the query controller is not currently registered with
river and hence it can not be discovered using the UUID of the query
controller alone. Probably the right thing to do is to register the
query controller with river so it can be discovered. We could then
modify getQueryPeer() (or add getQueryClient(UUID)) which would hit
the discovery cache.

- FederatedRunningQuery?<init> now accepts the controllerId plus

controller proxy. The controllerId should be resolved against a
local cache, not provided by the client. This happens inside of
MaterializeMessageTask#getDeclaredQuery?(), which provides a factory
for the FederatedRunningQuery?.

- Replaced reliance on IAsynchronousIterator as returned by

BOpContext#getSource(). The only class which use the
IAsynchronousIterator API at all was PipelineJoin?. PipelineJoin? had
some old code which did non-blocking reads from the source. However,
we no longer have chunks which are being read from a remote process.
All chunks are fully materialized before a PipelineOp? begins to
execute. Therefore there is no longer any reason to use the
non-blocking API in PipelineJoin?.

- Modified BOpUtility#toArray(itr) to close the iterator if it

implements ICloseableIterator. (This was probably not causing
problems because we close the source iterator in the
ChunkedRunningQuery? when the PipelineOp? is done.)

Committed revision r6010.

Changed 16 months ago by thompsonbry

Working on pure interfaces and tighter serialization for query engine messages [1].

- Remove IChunkMessage#getQueryController() and remove IQueryClient

from the serialized representation for all IChunkMessage
implementations. This will require registering the query controller
with river so we can resolve the queryController UUID to the query
controller service. (It is not registered as a service right
now. Discovery of the peers is done by discovering the data service
on which they are running, but the query controller is typically NOT
running on a data service.)

- IChunkMessage (and IChunkAccessor) do not need to be generic. They

are always parameterized as IBindingSets.

- Replace the IChunkAccessor with a plain Iterator<IBindingSet> or

Iterator<IBindingSet[]>. That would make it possible to simplify
many of the PipelineOp? implementation classes. See my notes on
IChunkAccessor.

- Pure interface and tight serialization for StartOpMessage? and

HaltOpMessage?.

  • Declared interfaces, including an IOpMessage and an IOpLifeCycleMessage interface. The latter is extended by both IStartOpMessage and IHaltOpMessage. The IOpMessage is now a shared interface for IChunkMessage as well.
  • StartOp?: dropped evaluation context and isLastPassRequested. There were available in RunState? and did not need to be shipped around. In fact, RunState? only exists on the query controller.
  • HaltOp?: The sinkId and altSinkId fields are redundant with the BOp declaration and should be dropped from the message. However, there is some funky inheritance stuff going on (the sinkId is not always given explicitly).

See BOpUtility#getEffectiveDefaultSink(final BOp bop, final BOp p).

This method is only used inside of ChunkTask?<init>(). It could be
made into a private method or the query could be made "complete"
by adding this information. Barring that, it could be moved onto
PipelineOp? and exposed as getSinkId() or maybe getSinkId(boolean
resolveParent).

  • Mild optimization to BOpUtility#getParent(root,op). It now uses arity() and get(i) rather than argIterator(). This should be faster since we always have a backing array for the arguments (the operator's direct children).
  • Added reporting for the #of shards/services (depending on whether or not the operator is sharded or hash partitioned) on which an operator has started (startOn) and is running (runningOn). This corresponds more or less to the older fanIn metric which was not be reported against the new query engine.
  • Added reporting for the #of concurrent instances of the operator which are running.

Committed revision r6013.

Changed 16 months ago by thompsonbry

Tasks remaining under this issue include:

- IChunkMessage: Tight serialization for IChunkMessages. This is actually a pretty big work item since it involves efficient serialization of solution sets and there are a lot of things to explore in support of that.

- BOpStats: Do tight serialization for BOpStats? It is part of the HaltOp? message, but we have a complex hierarchy for this. Also, we use direct field references for BOpStats as well as instanceof tests.

- Can operators which have a low expected latency simply send a "haltOp" with metadata to indicate that it should serve as both the startOp and haltOp for the operator task? That might let us cut the #of messages by close to 1/3 for some queries (we still have the bufferReady(IChunkMessage chunk) messages as well).

Changed 15 months ago by thompsonbry

  • type changed from defect to enhancement

The LUBM Q6 and Q14 queries are access path scans (plus RDF Value materialization) for LUBM. They cover quite a bit of the POS index. For example, (?x rdf:type Student). For U1000, it turns out that Q6 and Q14 span only 1 or 2 shards (depends on the query), but this would turn into more shards on U8000.

I've added a SHARDS request to the NSS. It seems that doing the locator scan and even taking the fast range count of the (?x rdf:type Student) is pretty darn fast (LT 50 ms even when cold).

I looked into why Q6 and Q14 are so slow. These should be FusedView? backed by a multi-block IO scan on the index segments. I was concerned that maybe we were not doing the multi-block IO (we are). However, it seems that the encoding of the solutions for RMI to the query controller is 67% of the time (!). Actually sending the data to the query controller is 28% of the time. Just sending chunks in parallel to the query controller is not going to help since we are spending so much time formatting the data for RMI.

Clearly the next performance hurdle is fast compact serialization of the solution sets for RMI. We did some work related to this for the Htree. For example, the Htree does not use POJO serialization (we are serializing solutions as object for RMI right now). It encodes IV[]s. Much faster and (potentially) we can even avoid decoding them to IBindingSets (but that would be more work on the IBindingSet API). But we also need to be able to send RDF Value objects.

I think that we now have fully inline IVs for all possible RDF Value objects. That might be one way to do this. The htree hash joins a secondary which maps the IV to the RDF Value (so a canonical mapping). We could generate and send that map, which would be another way to do it. Or combine the approaches.

If we could get the data onto the network as fast as we can read it from the disk that would be very, very nice.

The other place where we use multi-block IO is a hash join against a sharded access path. I have not tested this out yet. In theory, the solutions are materialized in htree instances (one per shard) on the nodes on the cluster on which we need to read. Once the data are fully materialized in those Htree instances, we then do a scan down each shard, joining against the Htree in memory. That should be our fastest "analytic" query join. See [1]

[1] https://sourceforge.net/apps/trac/bigdata/ticket/131

Changed 15 months ago by thompsonbry

I am looking at how to do a light weight and efficient encoding of binding sets for RMI messages on the cluster.

I have refactored the code that we use for this out of the Htree. However, the Htree builds a TermId? => Value and BlobIV => Value btree indices. That works for the Htree because (a) we expect very large scale and we need to get the Values off the JVM heap; and (b) the BTree provides fast resolution if we need to lookup the cached Value using the IV.

However, this does not work well for RMI. We may very well want to send a lot of data in RMI / NIO. But we can not readily send the BTree across the wire since it is composed of a bunch of nodes, leaves, a checkpoint record, etc.

I am thinking instead of sending the materialized RDF Value inline with the IV, but we need a way to mark this in the representation.

The current encoding takes an IBindingSet and formats it on an IKeyBuilder as an IV[], which of course is written onto a byte[] inside of the key builder. What I would like to do is mark somehow when the next IV decoded from the stream as being the cached Value associated with the immediately previous IV decoded from the stream. That way the caller has the cached Value immediately on hand. We would only do this once - the first time we encounter any given IV having a cached Value. The receiver would build up a map from IV to Value which they could use to resolve Values as necessary.

I think that this sounds pretty good. My questions are:

1. How can I "mark" an IV as being an RDF Value which is the cached value of the previous IV?

2. Are there any fence posts for RDF Values which we can not represent using fully inline IVs?

3. Can we write a read-only IBindingSet implementation which wraps a byte[] slice and a reference to this (IV=>Value) map and provides transparent decode without inflating the solutions to Java objects?

I've got some thoughts on how to deal with (1) and (3). I think that we do have full coverage on (2), but I need to verify that.

Changed 15 months ago by thompsonbry

Based on a discussion with MikeP, it seems that we could use either fully inline IVs or the BigdataValueSerializer? for the cached Value representation. My inclination is to go with the latter since that will avoid an IV => BigdataValue? conversion and since we will need appropriate framing bytes anyway.

The record format might look like this:

nbound ncached bitmap+ IV[0] ... IV[nbound-1] Value[0] ... Value[ncached-1]

where nbound is the #of bindings in the binding set.

where ncached is the #of bindings in the binding set for which there is a cached RDF Value which has not already been written into a previous record. Even if the IV has a cached value, if the IV has been previously written into a record then the IV is NOT record in this record with a cached Value. Further, if the IV appears more than once in a given record, the cached value is only marked in the bitmap for the first such occurrence and the cached value is only written into the record once.

where bitmap is one or more bytes providing a bit map indicating which IVs are associated with cached values written into the record. Whether or not an IV has a cached value must be decided by the caller after processing the record and consulting an (IV,Value) cache which they maintain over the set of records processed to date. Cached values are written out (and the bit set) only the first time a given IV with a cached Value is observed.

where IV[n] is an IV as encoded by IVUtility.

where Value is an RDF Value serialized using the BigdataValueSerializer? for the namespace of the lexicon.

The caller knows the namespace which must be used to obtain a BigdataValueFactory? and BigdataValueSerializer? to decode and materialize the cached values, but that information might not be accessible in the context in which we perform the decode. If not, then the namespace can be written out before the first record.

The decoder can materialize the cached values into a HashMap? or HTree (as appropriate for the data scale) as the records are processed. Only one solution needs to be decoded at a time, but the decoder must maintain the (IV,Value) cache across all decoded records. There is no need to indicate the #of records, but IChunkMessage#getSolutionCount() in fact reports exactly that information.

Each solution can be turned into an IBindingSet at the time that it is decoded. If we use a standard ListBindingSet?, then we need to resolve each IV against the IV cache, setting its RDF Value as a side effect before returning the IBindingSet to the caller. If we do a custom IBindingSet implementation, then the cached Value could be lazily materialized by hooking IVCache#getValue(). Either way, the life cycle of the materialized objects will be very short unless they are propagated into new solutions. Short life cycle objects entail very little heap burden.

Changed 15 months ago by thompsonbry

- Added interfaces for encoding and decoding solution sets.

- Extended test coverage for encoding and decoding solution sets.

- Analytic hash joins were not correctly resolving the IVCache for a BlobIV (bug fix).

Committed revision r6033.

Changed 15 months ago by thompsonbry

- Performance optimization for DataOutputBuffer#writeUTF?().

- Added Variant of DataOutputBuffer#writeUTF?() which wraps the IOException.

- Added DataInputBuffer#unpackInt?()

- Removed the namespace parameter from

BigdataValueFactory#remove?(). That information should already be
known to the BigdataValueFactory? implementation class (it obtains
the namespace from the constructor).

- Added variant on BigdataValueSerializer#serialize?(...) which does

not create the byte[] from the caller's buffer. This is more
efficient if you are writing into the desired DataOutputBuffer?
already.

- Added code to remove() the BigdataValueFactoryImpl? for the namespace

used by the test harness in tearDown() of the IBindingSetEncoder
test suites.

- Finished implementation and test suite for an encoder/decoder

suitable for a fast wire format.

The next step is to integrate the new encoder/decoder into the
ThickChunkMessage?.

Committed revision r6036.

Removed System.err statements from the encoder class.

Committed revision r6037.

Changed 15 months ago by thompsonbry

Modified the binding set encoder/decoder for the wire to write the
namespace of the lexicon relation into the encoded record when it
visits the first IVCache association. The namespace is used to obtain
the BigdataValueFactory? and the BigdataValueSerializer?. The namespace
will remain undiscovered (and uncommunicated) unless there is at least
one IVCache association in the encoded solutions.

Added a stream-oriented API to encoder/decoder.

Unit test for stream-oriented encoder/decoder.

Replaced the serialization in the ThickChunkMessage? with the new wire
encoder/decoder.

Implemented externalizable for ThickChunkMessage?, which replaces its
POJO serialization. The new format is versioned.

Adapted the FederatedQueryEngine? test suite to run with
LocalChunkMessage? in order to avoid an RDF dependency in the data
model. I added an annotation to the FederatedQueryEngine? for this
purpose.

Committed revision r6038.

Changed 15 months ago by thompsonbry

  • description modified (diff)

The CPU time for solution set serialization on LUBM Q14 dropped from 67% to 7%. The bottleneck is now the materialization of the RDF values. That operation probably needs a bigger chunk size.

I will close this once I verify that this change has not had a negative impact on queries with small solution sets (BSBM). It should not. I expect performance to be better there as well.

Changed 15 months ago by thompsonbry

  • status changed from accepted to closed
  • resolution set to fixed

There is no negative impact on BSBM cluster performance. The initial result was equal to the best to date.

Changed 15 months ago by thompsonbry

  • status changed from closed to reopened
  • resolution fixed deleted

Reopened. I am seeing NotMaterializedExceptions? in CI for the embedded federation on the TCK.

Changed 15 months ago by thompsonbry

Bug fix to IVSolutionSetEncoder and unit test for the bug. The IVCache associations were not being output correctly when some variables were bound to IVs which did not have IVCache associations while others were bound to IVs which did. This was causing much of the TCK to fail on the EmbeddedFederation? in CI.

This fixes most of the TCK for the embedded federation. There is still another problem that I am looking at now.

Committed revision r6039.

Changed 15 months ago by thompsonbry

There are still test failures related to the IVSolutionSetEncoder and IVSolutionSetDecoder.

The IVSolutionSetEncoder and IVSolutionSetDecoder both maintain a HashMap? whose keys are IVs and whose values are BigdataValues?. The encoder writes each IV -> Value association which it has not seen into its internal cache (that is how it recognizes a new association) and it writes each new association into the encoded record format. The problem is that TermId?.NULL always has a termId:=0L. This problem only becomes a problem with TermId#mockIV?() which is a factory for "NULL" Ivs that get created and passed around during query.

Looking at TermId#equals?(), I see that we have run into this before.

    public boolean equals(final Object o) {
        if (this == o)
            return true;
        if (o instanceof TermId<?>) {
            final TermId<?> t = (TermId<?>) o;
            if (this.termId == NULL || t.termId == NULL) {
                if (this.hasValue() && t.hasValue()) {
                    return this.getValue().equals(t.getValue());
                } else {
                    return false;
                }
            } else {
                return termId == t.termId;
            }
            //            return termId == ((TermId<?>) o).termId;
        }
        return false;
    }

It seems that we need to always send the IVCache's Value inline if TermId?.isNull() reports true. That will prevent us from attempting to resolve "NULL" IVs against the cache in IVSolutionSetDecoder.

The code above actually looks Ok to me. It might be that the problem only arises during de-serialization since we do not have the cached Value reference on hand when the IVSolutionSetDecoder attempts to resolve it within its internal IV -> Value cache.

Changed 15 months ago by thompsonbry

Bug fix to the IVSolutionSetEncoder. There were several edge cases with TermId?.mockIV() which were not being handled. This also identified some (as yet unfixed) problems with the pre-existing IBindingSetEncoder/Decoder implementations. Those problems could have implications for HTree hash joins with computed values, since computed values are made manifest as "mock" IVs. See [1].

This also includes some changes designed to lift control over the chunkCapacity and chunkOfChunksCapacity for RDF Value materialization into the query plan annotations. See [2].

[1] https://sourceforge.net/apps/trac/bigdata/ticket/475 (Optimize serialization for query messages on cluster)
[2] https://sourceforge.net/apps/trac/bigdata/ticket/489 (Optimize RDF Value materialization performance on cluster)

Committed revision r6040.

Changed 15 months ago by thompsonbry

  • status changed from reopened to closed
  • resolution set to fixed
Note: See TracTickets for help on using tickets.