Menu

multiple async commands, notify onend, get queued messages

2016-05-24
2016-06-01
  • Sander Pool

    Sander Pool - 2016-05-24

    Hi,

    I need to run multiple async commands, check for their completion and retrieve output. I use Python3 on linux.

    Basic structure:

    • Create my own handle
    • Start multiple async processes with returnstdout onend notify, record handles
    • For all handles:
      • check handle status with process query handle
      • if process is done (endTimestamp is not None)
        • get message using 'queue get handle <handle>'

    This does not work, I get an error 29 'This indicates that you tried to GET or PEEK a particular element in a queue, but no such element exists, or the queue is empty.'. If however I use a 'queue get' the message is there and I can retrieve the fileList etc from the resultObj.

    This suggests I am misunderstanding how 'queue get handle <handle>' works. The docs say "HANDLE specifies that you want to retrieve/remove message(s) originating from a process with the given handle number. ".

    I verified that the handle number is correct by polling handle status (works correctly) and freeing the process handle when I'm done. That also works fine.

    How do I retrieve messages sent by a known handle to my own handle? I think that is a better solution than retrieving all queued messages, recording them somewhere and associating results with the correct handles later.

    Thanks,

    Sander

    PS: partial log output showing commands and responses:

    This is what I think I should do but it fails:

    Submitting: process query handle 121
    Response: {'pid': '2951', 'staf-map-class-name': 'STAF/Service/Process/ProcessInfo', 'parms': None, 'focus': 'Background', 'workload': None, 'workdir': None, 'rc': '0', 'endTimestamp': '20160524-12:53:25', 'startTimestamp': '20160524-12:53:24', 'title': None, 'key': None, 'handleName': None, 'shell': '<Default Shell="">', 'userName': None, 'command': 'ls', 'handle': '121', 'startMode': 'Async'}

    Submitting: queue get handle 121
    Response:
    E
    Submitting: process free handle 121
    Response:

    This works but won't scale to multiple handles without complications that seem unnecessary:

    Submitting: process query handle 123
    Response: {'staf-map-class-name': 'STAF/Service/Process/ProcessInfo', 'pid': '3023', 'startMode': 'Async', 'workload': None, 'endTimestamp': '20160524-13:08:09', 'startTimestamp': '20160524-13:08:08', 'title': None, 'focus': 'Background', 'handleName': None, 'shell': '<Default Shell="">', 'rc': '0', 'userName': None, 'handle': '123', 'workdir': None, 'key': None, 'command': 'ls', 'parms': None}

    Submitting: queue get
    Response: {'timestamp': '20160524-13:08:09', 'handleName': 'STAF_Process', 'priority': '5', 'staf-map-class-name': 'STAF/Service/Queue/Entry', 'machine': 'local://local', 'handle': '1', 'type': 'STAF/Process/End', 'message': {'handle': '123', 'endTimestamp': '20160524-13:08:09', 'key': '', 'fileList': [{'rc': '0', 'data': 'anaconda-ks.cfg\ninstall.log\ninstall.log.syslog\nsoftware\n'}, {'rc': '0', 'data': ''}], 'rc': '0'}, 'user': 'none://anonymous'}
    ("Result handle:{'timestamp': '20160524-13:08:09', 'handleName': "
    "'STAF_Process', 'priority': '5', 'staf-map-class-name': "
    "'STAF/Service/Queue/Entry', 'machine': 'local://local', 'handle': '1', "
    "'type': 'STAF/Process/End', 'message': {'handle': '123', 'endTimestamp': "
    "'20160524-13:08:09', 'key': '', 'fileList': [{'rc': '0', 'data': "
    "'anaconda-ks.cfg\ninstall.log\ninstall.log.syslog\nsoftware\n'}, "
    "{'rc': '0', 'data': ''}]
    , 'rc': '0'}, 'user': 'none://anonymous'}")

    Submitting: process free handle 123
    Response:

     
  • Sharon Lucas

    Sharon Lucas - 2016-05-25

    In the example you showed, the QUEUE GET result only contained messages that originated from STAF handle 1 (STAFProc's handle) as it is STAFProc that originates STAF/Process/End messages. You were you submitting a QUEUE GET HANDLE 121 request and none of the messages in the queue that you showed originated from handle 121 (or 123). I think maybe you are confusing the handle (1) that sent the STAF/Process/End message with the handle (123) that happens to be part of the message field for a STAF/Process/End message type but is not a field that all message types have. Here is a formatted version of your QUEUE GET result that may help you see what I mean:

    Submitting: queue get
    Response:
    {
      'timestamp': '20160524-13:08:09',
      'handleName': 'STAF_Process',
      'priority': '5',
      'staf-map-class-name': 'STAF/Service/Queue/Entry',
      'machine': 'local://local',
      'handle': '1',    <== Used by HANDLE option.
      'type': 'STAF/Process/End',
      'message': {
         'handle': '123',  <== Part of message.  Not used by HANDLE option.
         'endTimestamp': '20160524-13:08:09',
         'key': '',
         'fileList': [
           {
             'rc': '0',
             'data': 'anaconda-ks.cfg\ninstall.log\ninstall.log.syslog\nsoftware\n'
           },
           {
             'rc': '0',
             'data': ''
           }
         ],
         'rc': '0'
       },
       'user': 'none://anonymous'
    }
    
     
  • Sander Pool

    Sander Pool - 2016-05-25

    Ah, OK, thanks for explaining that.

    So then question remains on the proper technique for doing this. I realize the speudo code wasn't quite correct. The situation is more like I'll start multiple async processes with arbitrary run times and I'll check for their completion in an unpredictable manner. Say I have process A and B. A Finishes first but I don't check for it's completion for a while. Then B finishes but I check for that right away. I detect that B finishes and now want to get its output. If I get the first message on the queue it'll be from A, not from B.

    I would like to retrieve the 'onend notify' from a particular process. Can STAF do this natively or do I need to store messages myself?

    I hope I explained it well enough, please ask if I did not. I appreciate the support you continue to provide for STAF, thanks!

     
  • Sharon Lucas

    Sharon Lucas - 2016-05-26

    When you submit a PROCESS START request asynchronously and use the NOTIFY ONEND option, you can also optionally specify the machine/handle#|handlename that the STAF/Process/End notification message should be sent to. Also, you can specify the PRIORITY of the notification message, as well as a KEY that can be included in the notification message. These options are described in sub-section "8.13.2 START", section "8.13 PROCESS Service" in the STAF User's Guide at http://staf.sourceforge.net/current/STAFUG.htm#HDRPROCSRV.

    This gives you several options to use to accomplish retrieving the STAF/Process/End notification messages for a particular process. For example:

    1) Use different handle queues for receiving ProcessA and ProcessB messages. Ceate one STAF handle (e.g. named ProcessA) and another STAF handle (e.g. named ProcessB). When submitting your PROCESS START request, specify the handle name (or number) to which the STAF/Process/End notification message will be sent. For example: NOTIFY ONEND HANDLE ProcessB. Use handle named ProcessB to submit local QUEUE GET requests to get STAF/Process/End notification messages for ProcessB. Use handle named ProcessA to submit local QUEUE GET requests to get STAF/Process/End notification messages for ProcessA.

    2) As talked about in section "2.6 Queues" in the STAF User's Guide, each handle in STAF has a priority queue associated with it. This queue is used to accept/retrieve messages from other processes/machines. Each message in the queue has a priority field associated with it:

    • Priority: An unsigned long value (0 - 4294967296) representing the importance of the message, with 0 representing the most important message.

    You can specify a higher PRIORITY when submitting a PROCESS START request for messages that you want to retrieve off a handle's queue sooner. By default, STAF assigns priority 5. You could specify PRIORITY 1 for notification messages for ProcessB. For example, on your PROCESS START request for ProcessB, specify NOTIFY ONEND PRIORITY 1. You could specify a lower PRIORITY (e.g. 10) for notification messages for ProcessA. When submitting a QUEUE GET request, the default is highest priority messages (the ones with the lowest priority number) will be retrieved before messages with lower priority. Or, you can specify the PRIORITY option when submitting a QUEUE GET request to only get messages off the queue that have the specified priorities (you can specify the PRIORITY option multiple times if desired). For example, to get ProcessB's STAF/Process/End notification messages (that you assigned PRIORITY 1 to), you could do: STAF local QUEUE GET TYPE STAF/Process/End PRIORITY 1.

    3) You can use the QUEUE GET request's CONTAINS/ICONTAINS option to retrieve message(s) off a queue that contain the given string(s) in conjunction. When submitting a PROCESS START request, you could use the KEY option to specify a string that the STAF/Process/End notification message will contain. Note that this KEY would have to be something unique that only process end notification messages for ProcessB contain. For this reason, I think it is probably better to use the priority option instead.

    You can decide which methods will work best for what you are trying to accomplish.

    Here are some simple examples (using the STAF command executable just for easy demonstration purposes) that show how you can use one handle (named ProcessEnd) to send process end notification messages from ProcessA and ProcessB and be able to retrieve messages just for ProcessA or ProcessB.

    Create a handle named "ProcessEnd":

    C:\>STAF local HANDLE CREATE HANDLE NAME ProcessEnd
    Response
    --------
    30
    

    Submit a PROCESS START request for ProcessA and send its process end notification message to the handle named ProcessEnd. Also, set priority 10 for the process end notification message and specify KEY "ProcessA":

    C:\>STAF local PROCESS START SHELL COMMAND "echo Message 1 from process A" RETURNSTDOUT STDERRTOSTDOUT NOTIFY ONEND NAME ProcessEnd KEY ProcessA PRIORITY 10
    Response
    --------
    42
    

    Submit a PROCESS START request for ProcessB and send its process end notification message to the handle named ProcessEnd. Also, set priority 1 for the process end notification messages and specify KEY "ProcessB":

    C:\>STAF local PROCESS START SHELL COMMAND "echo Message 1 from process B" RETURNSTDOUT STDERRTOSTDOUT NOTIFY ONEND NAME ProcessEnd KEY ProcessB PRIORITY 1
    Response
    --------
    43
    

    Submit a 2nd PROCESS START request for ProcessA and send its process end notification message to the handle named ProcessEnd. Also, set priority 10 for the process end notification message and specify KEY "ProcessA":

    C:\>STAF local PROCESS START SHELL COMMAND "echo Message 2 from process A" RETURNSTDOUT STDERRTOSTDOUT NOTIFY ONEND NAME ProcessEnd KEY ProcessA PRIORITY 10
    Response
    --------
    44
    

    Submit a 2nd PROCESS START request for ProcessB and send its process end notification message to the handle named ProcessEnd. Also, set priority 1 for the process end notification messages and specify KEY "ProcessB":

    C:\>STAF local PROCESS START SHELL COMMAND "echo Message 2 from process B" RETURNSTDOUT STDERRTOSTDOUT NOTIFY ONEND NAME ProcessEnd KEY ProcessB PRIORITY 1
    Response
    --------
    45
    

    Set the STAF_STATIC_HANDLE variable to the handle of the handle I created with name ProcessEnd that I sent the process end notification messages to. Note that this is needed since I'm using the STAF command executable for simplicity:

    C:\>set STAF_STATIC_HANDLE=30
    

    Submit a QUEUE PEEK ALL command to show the order of the messages in the queue. Note that the messages are in order by priority (priority 1 is higher priority than priority 10). This is the same order that the messages would be retreived from the queue using a "QUEUE GET" request (without other options specified such as PRIORITY or CONTAINS).

    C:\>STAF local QUEUE PEEK ALL
    Response
    --------
    [
      {
        Priority   : 1
        Date-Time  : 20160526-10:24:24
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:24
          fileList    : [
            {
              data: Message 1 from process B
    
              rc  : 0
            }
          ]
          handle      : 43
          key         : ProcessB
          rc          : 0
        }
      }
      {
        Priority   : 1
        Date-Time  : 20160526-10:24:55
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:55
          fileList    : [
            {
              data: Message 2 from process B
    
              rc  : 0
            }
          ]
          handle      : 45
          key         : ProcessB
          rc          : 0
        }
      }
      {
        Priority   : 10
        Date-Time  : 20160526-10:24:06
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:06
          fileList    : [
            {
              data: Message 1 from process A
    
              rc  : 0
            }
          ]
          handle      : 42
          key         : ProcessA
          rc          : 0
        }
      }
      {
        Priority   : 10
        Date-Time  : 20160526-10:24:46
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:46
          fileList    : [
            {
              data: Message 2 from process A
    
              rc  : 0
            }
          ]
          handle      : 44
          key         : ProcessA
          rc          : 0
        }
      }
    ]
    

    To show the process end notification messages just for Process B, you could specify option PRIORITY 1. Note that I'm use PEEK instead of GET so that the messages aren't removed from the queue so I can submit more QUEUE commands for this demonstration. Don't specify the ALL option if you want to get just 1 message off the queue at a time.

    C:\>STAF local QUEUE PEEK ALL TYPE STAF/Process/End PRIORITY 1
    Response
    --------
    [
      {
        Priority   : 1
        Date-Time  : 20160526-10:24:24
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:24
          fileList    : [
            {
              data: Message 1 from process B
    
              rc  : 0
            }
          ]
          handle      : 43
          key         : ProcessB
          rc          : 0
        }
      }
      {
        Priority   : 1
        Date-Time  : 20160526-10:24:55
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:55
          fileList    : [
            {
              data: Message 2 from process B
    
              rc  : 0
            }
          ]
          handle      : 45
          key         : ProcessB
          rc          : 0
        }
      }
    ]
    

    Or, to show the process end notification messages just for Process B, you could specify CONTAINS "ProcessB" (since the message's KEY option contains "ProcessB" because I specified this KEY when submitting the PROCESS START requests for ProcessB:

    C:\>STAF local QUEUE PEEK ALL TYPE STAF/Process/End CONTAINS ProcessB
    Response
    --------
    [
      {
        Priority   : 1
        Date-Time  : 20160526-10:24:24
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:24
          fileList    : [
            {
              data: Message 1 from process B
    
              rc  : 0
            }
          ]
          handle      : 43
          key         : ProcessB
          rc          : 0
        }
      }
      {
        Priority   : 1
        Date-Time  : 20160526-10:24:55
        Machine    : local://local
        Handle Name: STAF_Process
        Handle     : 1
        User       : none://anonymous
        Type       : STAF/Process/End
        Message    : {
          endTimestamp: 20160526-10:24:55
          fileList    : [
            {
              data: Message 2 from process B
    
              rc  : 0
            }
          ]
          handle      : 45
          key         : ProcessB
          rc          : 0
        }
      }
    ]
    
     
  • Sander Pool

    Sander Pool - 2016-06-01

    Hi,

    Thanks for this extensive post. I tried to emulate what you did here but I'm still missing something:

    [pools1@pools1-sc-vm unittests]$ ./test_staf.py T.test_cmd
    
    Submitting: handle create handle name :26:STAFProcess140367129906760
    Response: '71'
    
    Submitting: process start shell command :2:ls async returnstdout  returnstderr notify onend name STAFProcess140367129906760 workdir /tmp
    Response: '72'
    
    Submitting: process query handle 72
    Response: {'command': 'ls',
     'endTimestamp': None,
     'focus': 'Background',
     'handle': '72',
     'handleName': None,
     'key': None,
     'parms': None,
     'pid': '22249',
     'rc': None,
     'shell': '<Default Shell>',
     'staf-map-class-name': 'STAF/Service/Process/ProcessInfo',
     'startMode': 'Async',
     'startTimestamp': '20160601-12:01:20',
     'title': None,
     'userName': None,
     'workdir': '/tmp',
     'workload': None}
    
    Submitting: process query handle 72
    Response: {'command': 'ls',
     'endTimestamp': None,
     'focus': 'Background',
     'handle': '72',
     'handleName': None,
     'key': None,
     'parms': None,
     'pid': '22249',
     'rc': None,
     'shell': '<Default Shell>',
     'staf-map-class-name': 'STAF/Service/Process/ProcessInfo',
     'startMode': 'Async',
     'startTimestamp': '20160601-12:01:20',
     'title': None,
     'userName': None,
     'workdir': '/tmp',
     'workload': None}
    
    Submitting: process query handle 72
    Response: {'command': 'ls',
     'endTimestamp': '20160601-12:01:21',
     'focus': 'Background',
     'handle': '72',
     'handleName': None,
     'key': None,
     'parms': None,
     'pid': '22249',
     'rc': '0',
     'shell': '<Default Shell>',
     'staf-map-class-name': 'STAF/Service/Process/ProcessInfo',
     'startMode': 'Async',
     'startTimestamp': '20160601-12:01:20',
     'title': None,
     'userName': None,
     'workdir': '/tmp',
     'workload': None}
    
    **Submitting: queue get handle 71
    Response: ''
    E**
    Submitting: process query handle 72
    Response: {'command': 'ls',
     'endTimestamp': '20160601-12:01:21',
     'focus': 'Background',
     'handle': '72',
     'handleName': None,
     'key': None,
     'parms': None,
     'pid': '22249',
     'rc': '0',
     'shell': '<Default Shell>',
     'staf-map-class-name': 'STAF/Service/Process/ProcessInfo',
     'startMode': 'Async',
     'startTimestamp': '20160601-12:01:20',
     'title': None,
     'userName': None,
     'workdir': '/tmp',
     'workload': None}
    
    Submitting: process free handle 72
    Response: ''
    
    Submitting: handle delete handle :2:71
    Response: ''
    
    ======================================================================
    ERROR: test_cmd (__main__.T)
    ----------------------------------------------------------------------
    <snip>
    
    PySTAFv3.STAFException: Error submitting queue get handle 71, RC: 29, Result:
    
    ----------------------------------------------------------------------
    Ran 1 test in 1.021s
    
    FAILED (errors=1)
    [pools1@pools1-sc-vm unittests]$
    

    The 'queue get handle 71' in bold fails because the handle is not found. I guess the problem is I don't understand where this handle is created and what I can use it for? Deleting the handle works fine and submitting the process start too so I hope my misunderstanding is not too great.

    For this test I use my local machine as the end point for all my STAF commands but this should also work with remote machines as that is the whole point of using STAF for me :-)

    Thank you,

    Sander

     
  • Sander Pool

    Sander Pool - 2016-06-01

    I guess the difference is you peek at all queued messages and pick out the ones you want. I'm trying to get only the queue for the handle I created. That does not seem to work. Sorry I don't see how we've gained anything by creating a handle in this case. All messages still go to the same queue and it depends on the KEY to differentiate the origin. Seems I could remove the static handle (one less thing to worry about cleaning up) and just use KEYs?

    Am I doing something that is not a standard use case for STAF? It would appear that starting a bunch of async processes and retrieving their output is rather basic. Am I not using STAF correctly? I think at a previous company I got around this by writing stdout/stderr to temporary files and retrieving that content after the process ended. I always felt this was a bit cludgy.

     
  • Sharon Lucas

    Sharon Lucas - 2016-06-01

    In your Python script, you created a STAF handle named "STAFProcess140367129906760":

    Submitting: handle create handle name :26:STAFProcess140367129906760
    Response: '71'

    Did you use the STAF Python "STAFHandle" API to create this handle? You should use it to create a new STAF handle and it returns an instance of a STAFHandle object referencing this handle. Then you can use this handle reference (e.g. processEndHandle in the below example) to submit the QUEUE GET requests to get the STAF/Process/End notification messages that are being sent to this handle's queue. The important point that I think you missed is: "You can only get messages off your own handle's queue (for security reasons) so you have to use this handle to submit GET requests to the QUEUE service." For example:

    try:
        processEndHandle = STAFHandle("STAFProcess140367129906760")
    except STAFException as e:
        print("Error registering with STAF, RC: %d" % e.rc)
        sys.exit(e.rc)
    
    ...
    # Get STAF/Process/End notification messages off processEndHandle's queue
    result = processEndHandle.submit("local", "QUEUE", "GET TYPE STAF/Process/End")
    

    I assume you are already using the STAFHandle API to create a STAF handle that you use to submit PROCESS START requests. My example was using the STAF command line executable (for easy demonstration purposes only) so I had to use the HANDLE service's CREATE request. Via a python program, you should use the STAFHandle Python API to create another handle to use which returns a reference to the STAF handle that you can then use to submit STAF service requests (such as a QUEUE GET request). The STAFHandle API is is talked about in the STAF Python User's Guide at http://staf.sourceforge.net/current/STAFPython.htm#Header_STAFHandle

    Note: I recommend that you do not prefix the handle names that you create with STAF as the STAFProc daemon prefixes handles it creates with STAF. Just name the handle somthing like "MyApp/Process40367129906760" (where MyApp is whatever your application is named or whatever you want).

     
  • Sander Pool

    Sander Pool - 2016-06-01

    My mistake was using 'queue get' instead of 'queue list'. It's working now. Thanks again for your help!

     
  • Sharon Lucas

    Sharon Lucas - 2016-06-01

    A QUEUE LIST request can be submitted by another handle to look at messages on another handle's queue. But it does NOT take messages off the queue. Note that lmessages on a queue are stored in memory so you don't want to leave tons of messages on queues.

    A QUEUE GET/PEEK request can only be submitted by the handle of the queue you want to get messages off (for security reasons).

    This is talked about in the STAF User's Guide as follows:

    8.14.3 GET/PEEK

    GET allows you to retrieve and remove one or more elements from the queue of the handle submitting the GET request to the QUEUE service.

    PEEK allows you to retrieve one or more elements from the queue of the handle submitting the PEEK request without removing the element(s) from the queue.

    By default, only one element will be retrieved/removed from the queue if you don't specify the ALL or FIRST option.

    For security reasons, you are only allowed to retrieve messages from your own handle's queue, not from any other handle's queue. So, a GET/PEEK request only retrieves messages from the queue of the handle that submitted the GET/PEEK request to the QUEUE service. Note that you can use the LIST request to list messages that are in another handle's queue.

     

Log in to post a comment.