Menu

#21 Feature-request: add different resource-consumption per task

v1.0_(example)
open
nobody
feature (1)
1
2023-04-13
2023-03-17
Mikhail T.
No

Different tasks could require different amount of different resources. For example, some work requires substantially more RAM, whereas a different task may be utilizing multiple processing cores (such as by invoking a multi-threaded executable).

Dispy already knows, how much RAM and disk-space each node has, and how many processors. I am requesting adding an argument for cluster.submit(), that would allow the submitter to list each task's expected resource-consumption.

This would allow cluster to more intelligently pick the node for each task -- providing for a healthy mix of CPU- and RAM-intensive jobs, for example, avoiding diskspace-issues, etc. The only way to do this currently is by using cluster.submit_node(), which quickly complicates the client code...

The above would be a nice improvement by itself, but it can go even further. In addition to the three standard resource-types (RAM, disk, core-count), when creating the cluster, it should be possible to describe custom -- resource-types. Such as, for example the GPUs used for some calculations -- a 32-processor machine may have only 2 GPUs. Another example of a custom resource may be the very availability of some software-package -- the presence of which may be required for some tasks, but not all of them.

Finally, in addition to per-node resources, it should be possible to define cluster-wide limits for things like floating software-licenses or database-connections. Based on this information, tasks could be scheduled out of order, when the declared resource is (about to become) exhausted.

Discussion

  • Giridhar Pemmasani

    This is already possible with NodeAllocate feature. For example if you would like to only use nodes that have 32G RAM and 16 cores, you can filter nodes with:

    if __name__ == '__main__':
    ...
        class AllocateNode(dispy.NodeAllocate):
            def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform='*'):
                if cpus < 16:
                    return 0  # don't use this node
                if avail_info.memory < (32*1024*1024*1024):
                    return 0  # don't use this node
                return (cpus - 2)  # reserve 2 cores for other uses
    
    cluster = dispy.JobCluster(compute, nodes=[AllocateNode('*')])
    

    Tthen dispy will call AllocateNode.allocate method when a node is discovered. It should return number of CPUs dispy can manage ; 0 indicates not to use that node and any other number upto cpus can be returned. In the above, allocate returns cpus - 2 if node has enough memory and cores. This feature can also be useful if your application itself can use all the cores (e.g., parallel computing application) where you can use return 1 so dispy will distribute computations and data to all the nodes and computation will use all the cores on a node.

    Make sure nodes have psutil module installed; otherwise, avail_info will be None. You can also monitor memory, disk, swap usage in web interface (this may help in understanding performance problems / debug).

    Let me know if this works for you.

     
  • Giridhar Pemmasani

    The second type of resource check is also possible with setup function, which runs on each node before any jobs are submited. For example, this function can check GPUs / packages available etc. In fact, using setup to load all modules is better as it not only checks that packages are available on a node but avoids loading modules in computations (saves memory and time); e.g.,

    def node_setup(data_file):
        global tensorflow, data  # global variables are evailable to jobs
        import tensorflow
        with open(data_file, 'rb') as fd:
          data = fd.read()
        os.remove(data_file)  # the file is not needed anymore
        return 0
    
    def compute(n):
          # use 'data' and 'tensorflow' (no need to load)
          ...
    
    if __name__ == '__main__':
        node_id = 0
        dep_files = glob.glob('data_file_*')
    
        class AllocateNode(dispy.NodeAllocate):
            def allocate(...):
                if node_id == len(dep_files):
                    return 0
                self.setup_args = (data_files[node_id],)
                node_id += 1
                return cpus
    
        cluster = dispy.JobCluster(compute, setup=node_setup, nodes=[AllocateNode('*')])
    

    Here, data_file_i is sent to a node and node_setup is called with that file. If the function reurns 0, the node is used for jobs submitted later; otherwise, that node is not used for computation.

     

    Last edit: Giridhar Pemmasani 2023-03-19
    • Mikhail T.

      Mikhail T. - 2023-03-30

      Your examples presume, the tasks are all homogeneous -- and a node either is or is not suitable for all of them.

      That's different from the problems I'm trying to solve, which both stem from the tasks being heterogeneous. Any node in my setup can process any task -- there is no need for any node to be excluded.

      But the tasks have different resource-requirements -- something a smart scheduler can help address by, for example, mixing heavy CPU-users together with heavy RAM-users.

      Currently, when determining, where to send the next task, Dispy client looks for a node where the number of pending tasks is below the number of processors. This approach makes the following assumptions:

      1. Each task will only use one processor.
      2. Processor is the only resource a task will use considerably.

      Both of these assumptions can be incorrect. In my use-case, for example, some of the tasks -- not all, but many -- will use multiple processors on a given node: by spawning off a compiled multi-threaded executable. At the time of the task-submission the expected processor-count is known -- but there is no way to communicate this knowledge via the cluster.submit() call -- and thus Dispy is unlikely to find a good node to execute the task.

      Other resources required by each task -- both standard ones (processors, RAM, disk) and custom (GPUs, licenses, etc.) -- should be specifiable too. And, if specified, taken into account by the scheduler.

      These limits could be hard: a node without a GPU cannot process a task, that requires one; and soft: a task, that would use 37 processors, can still be submitted to a node with only 16 cores, but that node should not be sent any more tasks until this one completes.

      Now, these "hard" requirement can already be implemented in the client code by instantiating multiple shared clusters over the same node-population, and then:

        if usesGPU(task):
            myGPUCluster.submit(task)
        else:
            myRegularCluster.submit(task)
      

      but this can get messy quickly with multiple different resources to consider -- because a separate cluster is necessary for each combination of requirements.

      Am I making sense?

       
  • Giridhar Pemmasani

    I will look into this and find a generic way to add resource requirements. I will post patch to try once ready.

     
  • Giridhar Pemmasani

    I have emailed you an implementation of this feature.

     
    • Mikhail T.

      Mikhail T. - 2023-04-13

      Thank you for such a prompt reaction. It will take me a while to get to testing it though -- for now I'm still working with Dispy-4.10.5...

       

Log in to post a comment.

MongoDB Logo MongoDB