dispy version 2.8 has been released. This version implements Condition
primitive as per Condition primitive in threading module (earlier versions
differed slightly and incomplete). In addition, added AsynCoroThreadPool and
AsynCoroDBCursor. AsynCoroThreadPool can be used to create a pool of threads
to schedule synchronous tasks that can't be made asynchronous.
AsynCoroDBCursor uses AsynCoroThreadPool to convert synchronous database
cursor operations into asynchronous operations. This works with MySQLdb under
Python 2.x and pymysql under Python 3.x.
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
Dispy is a flexible tool for data parallelization. But I noted that there are
some problems about mapreduce implementation through the word count example in
the tutorial. It looks like that if we want to distribute the data before
processing, we have to store all the data in every compute node. It much waste
the storage and bandwidth. so I suggest if could expose the interface of node
setting for assigning the data jobs to the right compute nodes. Then the data
distribution and data locality can be exploited.
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
I am not sure I understand your concern. JobCluster takes 'nodes' argument
that specifies which nodes can execute the jobs. Thus, as alluded in the word
count example, specific nodes ('node1' and 'node2' in that example) are
assumed to have the data locally so the files are not transferred. If this
doesn't address what you mention, please feel free to email me.
The documentation of dispy is a bit lacking! I am guessing setting up a wiki
may improve the quality. I am trying sourceforge's new interface for asyncoro
project. If that works well, I will setup wiki so others can contribute
easily.
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
BTW, I just want to add that the files are transferred only if necessary. I
think documentation for this is far from easy to understand. It is possible to
setup transfer directory on remote nodes to specify where the files
should/would be. Then the nodes and client check if files (timestamp and size)
and transfer only those files that are not up to date. Further, if
'cleanup=False' is specified to cluster, dispynode won't remove files
transferred so that if they are needed in future computations (e.g., a
different run of same computation), the files don't need to be transferred at
all. These features are implemented specifically to address data locality
issue.
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
Thank you for response.
Sorry about I have not explain my problem clearly. My scenario is about
partitioning the data and distributing them over the compute nodes (in local
storage or DFS) by user program, and then processing the data by using dispy.
Take example for word count, I want to store data 'doc1' on node1, and 'doc2'
on node2, etc. Althrough JobCluster in Dispy can specify the nodes, but how to
schedule each job over these nodes is controlled by dispy itself and it looks
like the scheduler in dispy has no data-awared feature alike hadoop mapreduce
framework now. In fact, dispy couldn't know which node each partitioning data
is on. Users may have their own scheduling strategy. So in this case, how to
specify the node that each job is submitted to?
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
The framework is generic enough that you can customize it to suit specific
needs. For example, you can create one cluster per node (extreme case) and
distribute jobs so each node processes specifically for that cluster. Thus,
assuming that 'doc1' is on 'node1' and 'doc2' is on 'node2' etc., rough sketch
is:
cluster1 = JobCluster(mapper, nodes=)
cluster2 = JobCluster(mapper, nodes=)
...
now submit 'doc1' job to cluster1, 'doc2' job to cluster2 etc.
Of course, you can partition nodes more coarsely, if appropriate (such as,
say, 'node1' and 'node5' have 'doc1' and 'doc8', etc., in which case you
cluster1 above could use nodes= and submit both 'doc1' and 'doc8' to
cluster1).
If you would like to refer to this comment somewhere else in this project, copy and paste the following link:
dispy version 2.8 has been released. This version implements Condition
primitive as per Condition primitive in threading module (earlier versions
differed slightly and incomplete). In addition, added AsynCoroThreadPool and
AsynCoroDBCursor. AsynCoroThreadPool can be used to create a pool of threads
to schedule synchronous tasks that can't be made asynchronous.
AsynCoroDBCursor uses AsynCoroThreadPool to convert synchronous database
cursor operations into asynchronous operations. This works with MySQLdb under
Python 2.x and pymysql under Python 3.x.
Dispy is a flexible tool for data parallelization. But I noted that there are
some problems about mapreduce implementation through the word count example in
the tutorial. It looks like that if we want to distribute the data before
processing, we have to store all the data in every compute node. It much waste
the storage and bandwidth. so I suggest if could expose the interface of node
setting for assigning the data jobs to the right compute nodes. Then the data
distribution and data locality can be exploited.
Thanks for your post.
I am not sure I understand your concern. JobCluster takes 'nodes' argument
that specifies which nodes can execute the jobs. Thus, as alluded in the word
count example, specific nodes ('node1' and 'node2' in that example) are
assumed to have the data locally so the files are not transferred. If this
doesn't address what you mention, please feel free to email me.
The documentation of dispy is a bit lacking! I am guessing setting up a wiki
may improve the quality. I am trying sourceforge's new interface for asyncoro
project. If that works well, I will setup wiki so others can contribute
easily.
BTW, I just want to add that the files are transferred only if necessary. I
think documentation for this is far from easy to understand. It is possible to
setup transfer directory on remote nodes to specify where the files
should/would be. Then the nodes and client check if files (timestamp and size)
and transfer only those files that are not up to date. Further, if
'cleanup=False' is specified to cluster, dispynode won't remove files
transferred so that if they are needed in future computations (e.g., a
different run of same computation), the files don't need to be transferred at
all. These features are implemented specifically to address data locality
issue.
Thank you for response.
Sorry about I have not explain my problem clearly. My scenario is about
partitioning the data and distributing them over the compute nodes (in local
storage or DFS) by user program, and then processing the data by using dispy.
Take example for word count, I want to store data 'doc1' on node1, and 'doc2'
on node2, etc. Althrough JobCluster in Dispy can specify the nodes, but how to
schedule each job over these nodes is controlled by dispy itself and it looks
like the scheduler in dispy has no data-awared feature alike hadoop mapreduce
framework now. In fact, dispy couldn't know which node each partitioning data
is on. Users may have their own scheduling strategy. So in this case, how to
specify the node that each job is submitted to?
The framework is generic enough that you can customize it to suit specific
needs. For example, you can create one cluster per node (extreme case) and
distribute jobs so each node processes specifically for that cluster. Thus,
assuming that 'doc1' is on 'node1' and 'doc2' is on 'node2' etc., rough sketch
is:
cluster1 = JobCluster(mapper, nodes=)
cluster2 = JobCluster(mapper, nodes=)
...
now submit 'doc1' job to cluster1, 'doc2' job to cluster2 etc.
Of course, you can partition nodes more coarsely, if appropriate (such as,
say, 'node1' and 'node5' have 'doc1' and 'doc8', etc., in which case you
cluster1 above could use nodes= and submit both 'doc1' and 'doc8' to
cluster1).