Limit buffer size of infinite input sequence

W.Hummer
2011-04-22
2013-06-05
  • W.Hummer
    W.Hummer
    2011-04-22

    Hi,

    we are using MXQuery to run window queries over continuous XML event streams. The following simple example returns the events in groups of 3 items:

    Context ctx = new Context();
            CompilerOptions co = new CompilerOptions();
            co.setXquery11(true);
            XQCompiler compiler = new CompilerImpl();
            String query = "declare variable $input external; " +
                "for sliding window $w in $input//item " +
                "start at $s when true() end at $e when ($e - $s) ge 2 " +
                "return <result>{$w}</result>";
            PreparedStatement statement = compiler.compile(ctx, query, co);
            final XDMIterator result = statement.evaluate();
    
            StreamStore store = ctx.getStores().createStreamStore(
                    StoreFactory.SEQ_FIFO, "input-stream");
            XDMIterator wnd = store.getIterator(ctx);
            statement.addExternalResource(new QName("input"), wnd);
            StreamStoreInput input = new StreamStoreInput(store, true);
    
            // process window results in a separate thread..
            new Thread() {
                public void run() {
                    try {
                        Token token = null;
                        while((token = result.next()) != null) {
                            // process token..
                        }
                    } catch (Exception e) {}
                }
            }.start();
    
            // continuously add events...
            while(true) {
                XMLSource xmlIt = XDMInputFactory.createXMLInput(ctx,
                    new StringReader("<item>foo</item>"), true, Context.NO_VALIDATION,
                    QueryLocation.OUTSIDE_QUERY_LOC);
                Token tok;
                while ((tok = xmlIt.next()) != Token.END_SEQUENCE_TOKEN) {
                    input.bufferNext(tok);
                }
            }
    

    We have monitored the memory usage and it seems that the input store (SEQ_FIFO) continuously grows. In particular, the heap dump shows that the Token array in store.content.tokenBuffer holds a huge amount of tokens.

    However, in this example it would be sufficient to only keep a portion of the input items in memory and to discard the remaining items in order to avoid a memory leak.

    Is there a possibility to specify a limit for the length of the StreamStore token buffer, and, if the limit is exceeded, to drop tokens from the tail of the buffer (i.e., shift the tokens in the buffer array)?

    Thanks in advance!

     
  • W.Hummer
    W.Hummer
    2011-04-23

    Hi Peter,

    thanks for your quick response. I was able to fix the problem with the SVN version and some additional tweaks. The items of the input sequence are properly garbage-collected with this version, but it turned out that the result sequence (handled in the thread's run() method in the example above) buffered old window query results, and hence the TokenInterface buffer in one of the subiterators accumulated all the tokens.

    Here is what I did to fix this:

    for(XDMIterator iter : ((GFLWORIterator)result).getAllSubIters()) {
        if((iter instanceof LetIterator)) {
            Field f1 = CurrentBasedIterator.class.getDeclaredField("current");
            f1.setAccessible(true);
            WindowSequenceIterator i = (WindowSequenceIterator)f1.get(iter);
            WindowBuffer b = i.getMat();
            Field f2 = TokenBufferStore.class.getDeclaredField("nodeI");
            f2.setAccessible(true);
            int nodeI = (Integer)f2.get(b.getBuffer());
            Field f3 = TokenBufferStore.class.getDeclaredField("nodesDel");
            f3.setAccessible(true);
            int nodesDel = (Integer)f3.get(b.getBuffer());
            if((nodeI - nodesDel) > 10000) {
                System.out.println("Deleting " + (nodeI - nodesDel) + " items from buffer");
                ((TokenBufferStore)b.getBuffer()).deleteItems(nodeI - 1000);
            }
        }
    }
    

    I.e., the trick is to manually call deleteItems(..) on the buffer underlying the LetIterator sub-iterator. To that end, we need to determine some non-public values via reflection (CurrentBasedIterator.current, TokenBufferStore.nodeI, TokenBufferStore.nodesDel). Maybe you could consider adding read-only access to these fields? ;)

    I tested the query with 1 million input events and I can now safely say that there is no memory leak (at least for this particular query and configuration). As far as I can tell, the only limitation of this approach is that we use a fixed numeric value for the maximum buffer size (10000 tokens) - maybe we run into problems if one window query result is very (extremely) large.

    Many thanks for this free XQuery implementation and the great support!
    Regards, Waldemar

     
  • W.Hummer
    W.Hummer
    2011-04-23

    hm, I forgot to mention that the solution above was for a slightly different query, which contained a "let $input1 := $input//item" assignment at the beginning. This is the reason why we search for a sub-iterator of type LetIterator in the code above. Without the "let"-statement it seems to work (non-leaking) out of the box (without the need for tweaks)…
    Thanks