Over the past few weeks I've been working on a service in Python that I'm calling, in the tradition of naming projects after characters in Flash Gordon, Zarkov. So what exactly is Zarkov? Well, Zarkov is many things (and may grow to more):
In my previous post, I discussed Zarkov as an event logger. While this may be useful (say for logging to a central location from several different servers), there's a bit more to Zarkov. Today I'll focus on the map-reduce framework provided by Zarkov. If you want instructions on setting up Zarkov or getting events into it, please see my previous post.
Good question. Currently there exist two big frameworks of which I'm aware: Hadoop and Disco. Hadoop is a framework written in Java that includes all sorts of goodness to make sure that your larger-than-life job gets executed relatively efficiently on a large cluster. Disco has the same kinds of things. Both of them require a good bit of setup to get running. Both of them provide a distributed filesystem that they really like to get jobs from. Neither has tight MongoDB integration.
I wanted something smaller. Something easy to configure, with tight MongoDB integration. "But wait," I hear you say, "MongoDB has built-in map-reduce!" In fact, Zarkov began life as a front-end for MongoDB's own map-reduce. Unfortunately, there are a few problems with the built-in mapreduce command in MongoDB:
So I set out to write my own....
This is actually pretty simple. This is how you do it:
(zarkov) $ # Create the default directory in which zmr stores its temp files
(zarkov) $ mkdir /tmp/zmr
(zarkov) $ zcmd -y development.yaml serve
Yes, that's the same command as the event logger. You specify in the development.yaml file how many worker processes you want to keep "locally", and these will start up automatically with your router. For more complex setups, or to run the zmr framework without the Zarkov event logger, you can do the following:
(zarkov) $ # Start "router" -- this will also start local worker procs
(zarkov) $ zcmd -y development.yaml zmr-router &
Then on your worker nodes:
(zarkov) $ zcmd zmr-worker tcp://worker-ip:5555 &
Note that there's actually no need to specify the config file for the worker process; workers will connect to the router to retrieve their configuration. You do want to make sure that you start all your workers before you start your job in order to correctly load-balance between them.
Running a job is actually pretty simple. You need to connect to the zmr request port (by default this is 5555) on the router using a ZeroMQ REQ socket and submit a BSON message of the following format:
The map function has the following format:
def map(objects):
for object in objects:
output = do_something_to(object)
yield new_key, value
Basically, it's a generator that takes a sequence of dicts and yields zero or more (key, dict) pairs. The reduce function has the following format:
def reduce(key, values):
return one_value_from_many(values)
So for instance, if we wanted to count the number of objects, grouped by the field 'foo', you could do the following:
def map(objects):
for object in objects:
yield object['foo'], 1
def reduce(key, values):
return sum(values)
The result would be a collection with its objects' _id fields equal to the 'foo' values from the input collection and its 'value' fields equal to the number of objects with that 'foo' value.
Anonymous
I do always get an error from the zmr-router if i try to send a request:
2012-02-24 10:34:05,447 INFO [zarkov.command.zmr-23422] zmr-router startup
2012-02-24 10:34:09,838 ERROR [zarkov.zmr.router-23422] Error in request handler
Traceback (most recent call last):
File "/home/hgmoll/src/zarkov/lib/python2.7/site-packages/zarkov/zmr/router.py", line 73, in request_handler
obj = util.recv_bson(sock)
File "/home/hgmoll/src/zarkov/lib/python2.7/site-packages/zarkov/util.py", line 38, in recv_bson
msg = sock.recv()
File "/home/hgmoll/src/zarkov/lib/python2.7/site-packages/gevent_zeromq/core.py", line 122, in recv
return super(_Socket, self).recv(flags, copy, track)
File "socket.pyx", line 616, in zmq.core.socket.Socket.recv (zmq/core/socket.c:5961)
File "socket.pyx", line 650, in zmq.core.socket.Socket.recv (zmq/core/socket.c:5832)
File "socket.pyx", line 119, in zmq.core.socket._recv_copy (zmq/core/socket.c:1669)
ZMQError: Operation cannot be accomplished in current state
This is the c code which i use to send the job request. I assume there is still something wrong, but i do not understand why at least 'sock.recv' doesn't work?
Currently i find it rather hard to set up your system. The documentation seems to be a bit outdated. Starting the workers by
fails with the message
Currently i start the workers by using the same config file as for the router:
It would be extremly helpful if you could outline the map/reduce setup more detailed:
Well, it isn't that easy, at least for me ;-) What about a small python script which demonstrates job creation / job handling?