From: <zk...@us...> - 2009-04-06 22:22:45
|
Revision: 635 http://pyphant.svn.sourceforge.net/pyphant/?rev=635&view=rev Author: zklaus Date: 2009-04-06 22:22:39 +0000 (Mon, 06 Apr 2009) Log Message: ----------- Merge branch 'master' into svn-trunk * master: Bugfix: Typo in TestParam.py Enh: Refactoring to improve handling of recipe parts. Modified Paths: -------------- trunk/src/pyphant/pyphant/cli/createExecutionOrder.py trunk/src/pyphant/pyphant/cli/pyphant_exec.py trunk/src/pyphant/pyphant/core/CompositeWorker.py trunk/src/pyphant/pyphant/core/PyTablesPersister.py trunk/src/pyphant/pyphant/core/Worker.py trunk/src/pyphant/pyphant/tests/TestParam.py Modified: trunk/src/pyphant/pyphant/cli/createExecutionOrder.py =================================================================== --- trunk/src/pyphant/pyphant/cli/createExecutionOrder.py 2009-04-06 22:06:52 UTC (rev 634) +++ trunk/src/pyphant/pyphant/cli/createExecutionOrder.py 2009-04-06 22:22:39 UTC (rev 635) @@ -14,17 +14,14 @@ parser = optparse.OptionParser() recipeFile = argv[0] recipe = PyTablesPersister.loadRecipe(h5) - plugs = sum([w.getPlugs() for w in recipe.getWorkers()], []) + plugs = recipe.getAllPlugs() for p in plugs: parser.add_option("","--%s"%p.id, help="Request result of plug %s"%p.id, action="append_const", const=p, dest="requestedResults") (options, args) = parser.parse_args(filter(lambda o: 'plug' in o, argv)) if hasattr(options, "requestedResults") and options.requestedResults and len(options.requestedResults)>0: - walker = recipe.createCompositeWorkerWalker() - openSockets = sum([sum(walker.visit(lambda w: [s for s in w.getSockets() if not s.isFull()], - [p.worker]), []) - for p in options.requestedResults], []) + openSockets = sum([recipe.getOpenSocketsForPlug(p) for p in options.requestedResults], []) parser.values.socketMap = {} def optCallback(opt, opt_str, value, parser, socket): try: @@ -39,9 +36,10 @@ callback_args=(s,)) if help: args.append('-h') + parser.add_option("-n", "--max-orders-per-file", help="Maximal number of orders per file", type="int", dest="maxOrders") (options, args) = parser.parse_args(argv) # Order ::= (socketMap, resultPlug) - return [(options.socketMap, [p.id for p in options.requestedResults])] + return (options, args, recipe, [(options.socketMap, [p.id for p in options.requestedResults])]) from ImageProcessing.ImageLoaderWorker import ImageLoaderWorker def f2dc(f): @@ -52,7 +50,9 @@ return res import glob -def globOrder(h5, order): +from pyphant.core.KnowledgeManager import KnowledgeManager +km = KnowledgeManager.getInstance() +def globOrder(order, h5): sockMap = {} for sSpec in order[0].iteritems(): if sSpec[1].startswith('emd5://'): @@ -61,28 +61,59 @@ ids = [] for f in glob.iglob(sSpec[1]): dc = f2dc(f) - PyTablesPersister.saveResult(dc, h5) + if h5 != None: + PyTablesPersister.saveResult(dc, h5) + km.registerDataContainer(dc) ids.append(dc.id) sockMap[sSpec[0]] = ids return (sockMap, order[1]) def main(): import tables, copy - h5 = tables.openFile(sys.argv[1], 'r+') - orders = processArgs(h5) + sourcefile = sys.argv[1] + h5 = tables.openFile(sourcefile, 'r+') + options, args, recipe, orders = processArgs(h5) + h5.close() + import os.path + km.registerURL("file://"+os.path.realpath(sourcefile)) + orderLists = [] for order in orders: - order = globOrder(h5, order) + order = globOrder(order, None) singles = dict([(socket, id[0]) for socket, id in order[0].iteritems() if len(id)==1]) lists = dict([(socket, id) for socket, id in order[0].iteritems() if len(id)>1]) lens = [len(l) for l in lists.itervalues()] assert max(lens) == min(lens), "Illegal lengths." count = lens[0] + orderList = [] for i in xrange(count): sockMap = copy.deepcopy(singles) for s, l in lists.iteritems(): sockMap[s] = l[i] - PyTablesPersister.saveExecutionOrder(h5, (sockMap, order[1])) - h5.close() + orderList.append((sockMap, order[1])) + orderLists.append(orderList) + orders = sum(orderLists,[]) + if options.maxOrders == None: + count = len(orders) + else: + import math + count = options.maxOrders + i=0 + orderLists = [] + while i+count<len(orders): + orderLists.append(orders[i:i+count]) + i+=count + orderLists.append(orders[i:]) + for i, orderList in enumerate(orderLists): + filename = os.path.basename(sourcefile)[:-3]+'_%i.h5'%i + PyTablesPersister.saveRecipeToHDF5File(recipe, filename) + h5 = tables.openFile(filename, 'r+') + for o in orderList: + PyTablesPersister.saveExecutionOrder(h5, o) + for data in o[0].values(): + dc = km.getDataContainer(data) + PyTablesPersister.saveResult(dc, h5) + h5.close() + #PyTablesPersister.saveExecutionOrder(h5, o) if __name__=='__main__': Modified: trunk/src/pyphant/pyphant/cli/pyphant_exec.py =================================================================== --- trunk/src/pyphant/pyphant/cli/pyphant_exec.py 2009-04-06 22:06:52 UTC (rev 634) +++ trunk/src/pyphant/pyphant/cli/pyphant_exec.py 2009-04-06 22:22:39 UTC (rev 635) @@ -29,9 +29,15 @@ # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import pkg_resources -pkg_resources.require("pyphant") +#import pkg_resources +#pkg_resources.require("pyphant") +import pyphant +import ImageProcessing +import ImageProcessing.ImageLoaderWorker +import ImageProcessing.InvertWorker +import PIL.Image +import tools def main(): import sys @@ -45,6 +51,7 @@ from pyphant.core import PyTablesPersister recipe = PyTablesPersister.loadRecipe(h5) executionOrders = PyTablesPersister.loadExecutionOrders(h5) + h5.close() from tools import Emd5Src for order in executionOrders: for socket, emd5 in order[0].iteritems(): @@ -61,8 +68,9 @@ plug = getattr(d, pSpec[1]) res = plug.getResult() res.seal() + h5 = tables.openFile(filename, 'r+') PyTablesPersister.saveResult(res, h5) - h5.close() + h5.close() if __name__ == '__main__': main() Modified: trunk/src/pyphant/pyphant/core/CompositeWorker.py =================================================================== --- trunk/src/pyphant/pyphant/core/CompositeWorker.py 2009-04-06 22:06:52 UTC (rev 634) +++ trunk/src/pyphant/pyphant/core/CompositeWorker.py 2009-04-06 22:22:39 UTC (rev 635) @@ -137,6 +137,21 @@ raise ValueError, "Recipe does not contain Worker %s" % desiredWorker return result + def findConnectorForId(self, id): + splittedId = id.split('.',1) + if len(splittedId)==1: + return super(CompositeWorker, self).findConnectorForId(splittedId[0]) + else: + w = self.getWorker(splittedId[0]) + return w.findConnectorForId(splittedId[1]) + + def getAllPlugs(self): + return sum([w.getPlugs() for w in self.getWorkers()], []) + + def getOpenSocketsForPlug(self, plug): + walker = self.createCompositeWorkerWalker() + return sum(walker.visit(lambda w: [s for s in w.getSockets() if not s.isFull()], [plug.worker]), []) + def getSources(self): return self._sources Modified: trunk/src/pyphant/pyphant/core/PyTablesPersister.py =================================================================== --- trunk/src/pyphant/pyphant/core/PyTablesPersister.py 2009-04-06 22:06:52 UTC (rev 634) +++ trunk/src/pyphant/pyphant/core/PyTablesPersister.py 2009-04-06 22:22:39 UTC (rev 635) @@ -350,3 +350,7 @@ resultPlug = orderGroup._v_attrs.resultPlug orders.append((socketMap, resultPlug)) return orders + +def pruneResults(h5): + h5.removeNode("/results", recursive=True) + h5.createGroup("/", "results") Modified: trunk/src/pyphant/pyphant/core/Worker.py =================================================================== --- trunk/src/pyphant/pyphant/core/Worker.py 2009-04-06 22:06:52 UTC (rev 634) +++ trunk/src/pyphant/pyphant/core/Worker.py 2009-04-06 22:22:39 UTC (rev 635) @@ -196,6 +196,12 @@ def getPlugs(self): return self._plugs.values() + def findConnectorForId(self, id): + splittedId = id.split('.',1) + if len(splittedId)==1: + return getattr(self, splittedId[0]) + raise ValueError, "Illegal connector id <%s>"%id + def connectorsExternalizationStateChanged(self, connector): if self.parent: self.parent.workersConnectorStateChanged(self,connector) Modified: trunk/src/pyphant/pyphant/tests/TestParam.py =================================================================== --- trunk/src/pyphant/pyphant/tests/TestParam.py 2009-04-06 22:06:52 UTC (rev 634) +++ trunk/src/pyphant/pyphant/tests/TestParam.py 2009-04-06 22:22:39 UTC (rev 635) @@ -50,7 +50,7 @@ def setUp(self): self.worker = ImageProcessing.ThresholdingWorker.ThresholdingWorker() - self.worker.registerParamListener(self.vetoer, 'name', ParamChangeRequested) + self.worker.registerParamListener(self.vetoer, 'name', Param.ParamChangeRequested) def vetoer(self, event): if event.newValue == 'bad': This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |