Menu

Extending Zarkov's Map/Reduce

Since I've been working on Zarkov, I've been writing a few map/reduce jobs. One of the things I noticed about map/reduce, at least as I've implemented it in Zarkov, is that it's pretty inefficient if you want t o generate several aggregate views of the same data (like say an event stream). In order to meet Zarkov's performance requirements without making my operations team too angry, I decided to "extend" map/reduce, as it's implemented in Zarkov. I'm not terribly creative, so I called this command xmapreduce. Here' I'll briefly describe map/reduce, show the problem with it, and explain the solution implemented in Zarkov.

Map/Reduce

The map/reduce algorithm is nice because a) it's extremely scalable and b) it's pretty easy to grasp the basics. The idea is that you write two functions, map and reduce which will be applied to a large dataset to get yourself some interesting data. For our purposes here, I'll illustrate using Zarkov map/reduce.

Let's say you want to see how many timestamped objects in some collection exist for each date. Here's how you'd write your Zarkov map/reduce functions:

def map(objects):
    for object in objects:
        yield object['timestamp'].date(), 1

def reduce(key, values):
    return sum(values)

The map/reduce framework will take these functions and do (basically) the following (but efficiently distributed over several workers):

import operator, itertools

def map_reduce(input_collection, query, output_collection, map, reduce):
    objects = input_collection.find(query)
    map_results = list(map(objects))
    map_results.sort(key=operator.itemgetter(0))
    for key, kv_pairs in itertools.groupby(map_results, operator.itemgetter(0)):
        value = reduce(key, [ v for k,v in kv_pairs ])
        output_collection.save({"_id":key, "value":value})

And that's pretty much all there is to it (conceptually).

The Problem with Map/Reduce

The problem arises when you have several output collections that all depend on a single input collection (or even depend on the same documents in the input collection). Map/reduce can get you there, bit it'll execute the query once for each output collection, and it needs to distribute the data to the map() functions once for every output collection. This isn't too efficient.

Let's extend our example above by tracking the date, month, and year of each object. In classic map/reduce, we'd need three map functions:

def map_date(objects):
    for object in objects:
        yield object['timestamp'].date(), 1

def map_month(objects):
    for object in objects:
        yield object['timestamp'].date().replace(day=1), 1

def map_year(objects):
    for object in objects:
        yield object['timestamp'].date().replace(month=1,day=1), 1

def reduce(key, values):
    return sum(values)

Now, if we treat these three jobs separately, we get a lot of duplicated effort. Particularly, Zarkov must query the input collection three times and transfer the data to map workers three times. (There's also duplicate serialization/deserialization of the input objects, though the Python bson library is quite fast.)

XMapReduce to the Rescue

The solution Zarkov uses for this case is to allow the map function to return a target collection along with key and value. What xmapreduce does is transform the map functions above into one xmap function, and tweaking our reduce function just a bit to take the collection as an input:

def xmap(objects):
    for object in objects:
        yield { 
            'c':'by_date', 
            'k':object['timestamp'].date(),
            'v':1 }
        yield { 
            'c':'by_month', 
            'k':object['timestamp'].date().replace(day=1),
            'v':1 }
        yield { 
            'c':'by_year', 
            'k':object['timestamp'].date().replace(day=1, month=1),
            'v':1 }

def xreduce(collection, key, values):
    return sum(values)

The xmapreduce algorithm is just a slight bit more complex than the map/reduce algorithm:

import operator, itertools

def xmap_reduce(input_collection, query, output_db, map, reduce):
    objects = input_collection.find(query)
    map_results = list(map(objects))
    keyfunc = lambda doc:(doc['c'], doc['k'])
    map_results.sort(key=keyfunc)
    for (c,k), docs in itertools.groupby(map_results, keyfunc):
        value = reduce(c,k, [ doc['v'] for doc in docs ])
        output_db[c].save({"_id":key, "value":value})

Now, we can invoke a single job to calculate all three aggregates, with close to a 3x reduction in data transfer and a significant speedup. Assuming that a) your jobs can be combined into a single xmapreduce job and b) you find it acceptable to code the target collection in your map jobs, xmapreduce should give you a significant speedup over classic map/reduce. In our case, we started with 12 map/reduce jobs and ended up with 4 xmapreduce jobs, with a tremendous speedup, but of course your mileage may vary. Happy hacking!

Posted by Rick Copeland ☕ 2011-07-26

Anonymous
Anonymous

Add attachments
Cancel





Want the latest updates on software, tech news, and AI?
Get latest updates about software, tech news, and AI from SourceForge directly in your inbox once a month.