Work at SourceForge, help us to make it a better place! We have an immediate need for a Support Technician in our San Francisco or Denver office.

Close

Diff of /sandbox/jlf/samples/concurrency/coactivity.cls [r7646] .. [r7647] Maximize Restore

  Switch to side-by-side view

--- a/sandbox/jlf/samples/concurrency/coactivity.cls
+++ b/sandbox/jlf/samples/concurrency/coactivity.cls
@@ -1,10 +1,8 @@
 /*
 Coactivity is an emulation of coroutines in ooRexx.
-
-This script works with a standard ooRexx, but there is a dependency on doers.
-It's up to the package which requires the current script to requires also :
-either extension.cls (if supporting ::extension)
-or extension-std.cls (if standard ooRexx)
+This script needs a modified ooRexx interpreter, because depends on :
+.context~args = <array>
+.threadLocal
 
 This is not a "real" coroutine implementation, because it's based on ooRexx activities
 and synchronization. But at least you have all the functionalities of stackful
@@ -20,7 +18,6 @@
 
 --::options trace i
 --::options NOMACROSPACE
-::requires "concurrency/activity.cls"
 
 
 --------------------------------------------------------------------------------
@@ -28,6 +25,11 @@
 ::class "yield" public
 ::method "[]" class unguarded
     forward message ("yield") to (.Coactivity)
+
+
+-- Another way to yield : call yield [value]
+::routine yield public
+    .Coactivity~sendWith("yield", arg(1, "a"))
 
 
 --------------------------------------------------------------------------------
@@ -44,21 +46,19 @@
 
 -- Class attributes
 --::attribute globalCache class private -- contains all the active coactivities
-::attribute makeArrayLimit class -- Coactivities can generate an infinite number of values, so must put a limit...
 
 
 ::method init class
     expose globalCache
     -- Using a Directory instead of an IdentityTable because of the experimentation with WeakReferences.
     globalCache = .Directory~new
-    self~makeArrayLimit = 10000 -- not a constant, not a private attribute, I think it's useful to let the end user change this value
 
 
 ::method register class
     expose globalCache
-    use strict arg coactivityObj
-    -- Remember : A weak reference is useless here because only started-not-ended-killed coactivities
-    -- are referenced by this cache, and a started-not-ended-killed coactivity can't be GC'ed because
+    use strict arg coactivityObj -- Remember : don't register the proxy ! that would forbid GC
+    -- Remember : A weak reference is useless here because only started-not-(ended-killed) coactivities
+    -- are referenced by this cache, and a started-not-(ended-killed) coactivity can't be GC'ed because
     -- (by definition) its start method is running and the self variable references the coactivity.
     globalCache[coactivityObj~identityHash] = coactivityObj
 
@@ -97,7 +97,7 @@
     return count
 
 
-::method init unguarded
+::method init
     /*
     The coactivity entry can be a routine, a couple (message, object) or a couple (method, object).
     The default entry is the couple ("main", self).
@@ -115,17 +115,17 @@
 
 
 --::method unknown
-    -- I don't define the unknown method because (for the moment) .Coactivity is a
-    -- mixin class. I don't think it's a good idea to inherit an unknown method from a
-    -- mixin class, knowing that it's not unusual to inherit from several mixin classes.
-
-
-::method executable
-    expose coactivityObj
-    forward to (coactivityObj)
-
-
-::method start unguarded
+    -- I don't define the unknown method because (for the moment) .Coactivity is a mixin class.
+    -- I don't think it's a good idea to inherit an unknown method from a mixin class, knowing
+    -- that it's not unusual to inherit from several mixin classes.
+
+
+::method executable unguarded
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
+::method start -- was declared unguarded because of deadlock4, but finally better to keep it guarded and declare ~yield unguarded
     /*
     Create the activity that will control the coactivity and make it suspended.
     Use 'resume' to start the coactivity effectively.
@@ -141,7 +141,7 @@
     -- (The subclassing is optional. You can use any doer as entry point).
 
 
-::method yield class unguarded
+::method yield class unguarded -- unguarded because of deadlock4
     /*
     Helper method to let yield from any nested invocation
     (i.e. directly or indirectly called by the coactivity 'start' method).
@@ -152,25 +152,32 @@
                 ...                    |
                     invocation : .Coactivity~yield()
     */
-    coactivityObj = .Activity~local~coactivityObj
+    coactivityObj = .threadLocal["coactivity"]
     if coactivityObj == .nil then raise syntax 93.900 array ("yield can be used only from a coactivity")
     forward to (coactivityObj)
 
 
-::method yield private
+::method yield private unguarded -- unguarded because of deadlock4
     -- Can be called only from a coactivity.
     -- Returns an array which contains the arguments passed to 'resume' by the client of the coactivity.
     expose coactivityObj
     forward to (coactivityObj)
 
 
-::method resume unguarded -- must be unguarded because can start the coactivity (so must be lock free to let the coactivityObj re-enter when calling yield)
+::method resume -- MUST be guarded ! If unguarded then the same value could be returned to different concurrent clients
+                -- Remember : Was declared unguarded to fix deadlock4 but that was a bad decision. Now, yield is unguarded.
     -- You can pass arguments to this method.
     -- They will be passed to the coactivity, either as traditional 'use arg' if first call, or as an array returned by 'yield'.
     expose coactivityObj
     forward to (coactivityObj)
 
 
+::method resumeWithIndex -- MUST be guarded !
+    -- Like ~resume, but the result is an array of (item, index)
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
 ::method end
     expose coactivityObj
     forward to (coactivityObj)
@@ -181,27 +188,27 @@
     forward to (coactivityObj)
 
 
-::method isStarted
-    expose coactivityObj
-    forward to (coactivityObj)
-
-
-::method isAlive
-    expose coactivityObj
-    forward to (coactivityObj)
-
-
-::method isEnded
-    expose coactivityObj
-    forward to (coactivityObj)
-
-
-::method isKilled
-    expose coactivityObj
-    forward to (coactivityObj)
-
-
-::method statusText
+::method isStarted unguarded
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
+::method isAlive unguarded
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
+::method isEnded unguarded
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
+::method isKilled unguarded
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
+::method statusText unguarded
     expose coactivityObj
     forward to (coactivityObj)
 
@@ -211,10 +218,14 @@
     forward to (coactivityObj)
 
 
+::method iterator unguarded
+    expose coactivityObj
+    forward to (coactivityObj)
+
+
 ::method makeArray unguarded
-    -- This is really NOT adapted to coactivities which can generate an infinite set of values  !
-    -- But this is the only way to write : do over myCoactivity ...
-    -- A better approach would be to modify the interpreter to support : do over anySupplier ...
+    -- This is the only way to write : do item over myCoactivity ...
+    -- A better approach would be to modify the interpreter to support : do item over anySupplier ...
     expose coactivityObj
     forward to (coactivityObj)
 
@@ -242,14 +253,16 @@
 -- ::attribute proxy private
 -- ::attribute status private
 -- ::attribute arguments private
--- ::attribute yieldValue private
+-- ::attribute yieldItem private
+-- ::attribute yieldIndex private
 
 
 ::method init unguarded
-    expose proxy doer object status
+    expose doer object proxy status yieldIndex
     use strict arg action, start, object, proxy
     doer = action~doer
     status = .CoactivityObj~notStarted
+    yieldIndex = 0
     if start then self~start
 
 
@@ -259,7 +272,7 @@
     self~end
 
 
-::method executable
+::method executable unguarded
     expose doer
     return doer
 
@@ -270,8 +283,7 @@
     if status <> .CoactivityObj~notStarted then return
     status = .CoactivityObj~suspended
     reply self
-    .Activity~local~empty
-    .Activity~local~coactivityObj = self
+    .threadLocal["coactivity"] = self
     .Coactivity~register(self)
     signal on any name trapCondition -- catch all
     signal on syntax name trapCondition -- gives better messages
@@ -291,35 +303,48 @@
     self~kill -- maybe already killed or ended
     if self~hasMethod("onTerminate") then self~onTerminate
     .Coactivity~unregister(self)
-    .Activity~local~empty
     if self~isKilled & condition("o") <> .nil then raise propagate
 
 
 ::method yield --private
-    expose arguments status yieldValue
-    drop yieldValue
-    if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed") -- this is to unwind any nested invocation and return to 'start'
-    if status == .CoactivityObj~ended then raise syntax 93.900 array ("Coactivity is ended") -- this is to unwind any nested invocation and return to 'start'
-    if arg() <> 0 then use strict arg yieldValue -- yieldValue will be returned to the Coactivity's client by 'resume'
+    expose arguments status yieldIndex yieldItem
+    drop yieldItem
+    if status == .CoactivityObj~killed then raise syntax 93.900 array ("Can't yield, the coactivity is killed") -- this is to unwind any nested invocation and return to 'start'
+    if status == .CoactivityObj~ended then raise syntax 93.900 array ("Can't yield, the coactivity is ended") -- this is to unwind any nested invocation and return to 'start'
+    if arg() <> 0 then do
+        use strict arg yieldItem -- yieldItem will be returned to the Coactivity's client by 'resume'
+        yieldIndex += 1
+    end
     status = .CoactivityObj~suspended
     guard off
     guard on when status <> .CoactivityObj~suspended
-    if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed") -- this is to unwind any nested invocation and return to 'start'
-    if status == .CoactivityObj~ended then raise syntax 93.900 array ("Coactivity is ended") -- this is to unwind any nested invocation and return to 'start'
-    if arguments~items <> 0 then return arguments -- returns the arguments that the coactivity's client passed to 'resume'
+    if status == .CoactivityObj~killed then raise syntax 93.900 array ("The coactivity has been killed") -- this is to unwind any nested invocation and return to 'start'
+    if status == .CoactivityObj~ended then raise syntax 93.900 array ("The coactivity has been ended") -- this is to unwind any nested invocation and return to 'start'
+    -- Update the arguments of the caller's context
+    -- Must unwind until we reach a context whose package is not the current package.
+    context = .context
+    currentPackage = context~package
+    do while context <> .nil, context~package == currentPackage -- search for the first context outside this package
+        context = context~parentContext -- .nil if native or top-level activation.
+    end
+    if context == .nil then raise syntax 93.900 array ("Can't update the arguments, yield's context not found")
+    context~args = arguments -- assigns the arguments that the coactivity's client passed to 'resume'
 
 
 ::method yieldLast private
     /*
     Internal method called when the coactivity action has returned.
     */
-    expose yieldValue
-    drop yieldValue
-    if arg() <> 0 then use strict arg yieldValue -- yieldValue will be returned to the coactivity's client by 'resume'
+    expose yieldIndex yieldItem
+    drop yieldItem
+    if arg() <> 0 then do
+        use strict arg yieldItem -- yieldItem will be returned to the coactivity's client by 'resume'
+        yieldIndex += 1
+    end
 
 
 ::method resume
-    expose arguments status yieldValue
+    expose arguments status yieldItem
     if status == .CoactivityObj~notStarted then self~start
     if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
     if status == .CoactivityObj~ended then return -- raise syntax 93.900 array ("Coactivity is ended")
@@ -328,7 +353,20 @@
     guard off
     guard on when status <> .CoactivityObj~running
     if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
-    if var("yieldValue") then return yieldValue
+    if var("yieldItem") then return yieldItem
+
+
+::method resumeWithIndex
+    expose arguments status yieldIndex yieldItem
+    if status == .CoactivityObj~notStarted then self~start
+    if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
+    if status == .CoactivityObj~ended then return -- raise syntax 93.900 array ("Coactivity is ended")
+    arguments = arg(1, "a")
+    status = .CoactivityObj~running
+    guard off
+    guard on when status <> .CoactivityObj~running
+    if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
+    if var("yieldItem") then return .array~of(yieldItem, yieldIndex)
 
 
 ::method end
@@ -349,27 +387,27 @@
     return .true
 
 
-::method isStarted
+::method isStarted unguarded
     expose status
     return status <> .CoactivityObj~notStarted
 
 
-::method isAlive
+::method isAlive unguarded
     expose status
     return status == .CoactivityObj~suspended | status == .CoactivityObj~running
 
 
-::method isEnded
+::method isEnded unguarded
     expose status
     return status == .CoactivityObj~ended
 
 
-::method isKilled
+::method isKilled unguarded
     expose status
     return status == .CoactivityObj~killed
 
 
-::method statusText
+::method statusText unguarded
     expose status
     select
         when status == .CoactivityObj~notStarted then return "not started"
@@ -381,42 +419,39 @@
     end
 
 
-/*
-BAD IDEA ! The method defaultName is called by the interpreter when tracing and you enter in
-a recursive loop because defaultName itself is traced. Should use "objectName=" instead, but
-will have to call it each time the status is changed. Maybe later...
-
-::method defaultName
-    defaultName = self~statusText self~class~id
-    if defaultName~caselessMatchChar(1, "aeiou") then article = "An" ; else article = "A"
-    return article defaultName
-*/
-
-
 ::method supplier unguarded
     expose proxy
-    return .LazyCoactivitySupplier~new(proxy~value) -- must pass the wrapping coactivity, not self, otherwise the coactivity may be GC'ed even if the supplier is running and not GC'ed
+    return .CoactivitySupplierForGeneration~new(proxy~value) -- must pass the wrapping coactivity, not self, otherwise the coactivity may be GC'ed even if the supplier is running and not GC'ed
+
+
+::method iterator unguarded
+    expose proxy
+    return .CoactivitySupplierForIteration~new(proxy~value) -- must pass the wrapping coactivity, not self, otherwise the coactivity may be GC'ed even if the supplier is running and not GC'ed
 
 
 ::method makeArray unguarded
-    use strict arg limit=(.Coactivity~makeArrayLimit)
+    -- The count parameter gives the maximal number of items in the array.
+    -- This is not the number of resumes, which can be greater if no result returned sometimes.
+    use strict arg count=(-1)
     array = .Array~new
     do forever
-        if array~dimension(1) >= limit then do
+        if count >=0, array~dimension(1) >= count then do
             -- Better to not end the coactivity : makeArray is like clojure's take or like the pipestage .take
             -- self~end
             leave
         end
         self~resume
-        if \var("result") then leave
-        array~append(result)
+        if var("result") then array~append(result)
+        -- Remember : don't append .nil when no result.
+        -- I don't want to get an array of thousands .nil when no result returned by a coactivity which is resumed thousands times.
+        if \ self~isAlive then leave
     end
     return array
 
 
 --------------------------------------------------------------------------------
 /*
-Lazy Coactivity supplier :
+Coactivity supplier :
 This supplier does not take a snapshot of the items remaining to generate by the coactivity.
 Instead, it calculates the next item only when the 'next' method is called.
 
@@ -428,7 +463,7 @@
 From now, the pipeline can be GC'ed, because it's no longer referenced from the call stack.
 And once the pipeline is GC'ed, there is no more reference to the coactivity supplier, which can be GC'ed.
 */
-::class "LazyCoactivitySupplier" subclass Supplier
+::class "CoactivitySupplier" public subclass Supplier
 
 --::attribute coactivity private
 --::attribute currentIndex private
@@ -437,37 +472,53 @@
 
 
 ::method init
-    expose coactivity currentIndex
+    expose coactivity isAvailable
     use strict arg coactivity
     empty = .array~new(0) -- Lazy supplier
     self~init:super(empty, empty)
-    currentIndex = 0
-    self~next
+    --self~next -- Too early ! The first call to ~next must be done from ~available
+    isAvailable = -1 -- special value to indicate that ~next must be executed
 
 
 ::method available
     expose isAvailable
+    if isAvailable == -1 then self~next
     return isAvailable
 
 
 ::method index
     expose currentIndex isAvailable
+    if isAvailable == -1 then self~next
     if isAvailable then return currentIndex
 
 
 ::method item
     expose currentItem isAvailable
+    if isAvailable == -1 then self~next
     if isAvailable then return currentItem
 
 
+-- If no result returned by the coactivity, then item=.nil and index=.nil
+-- This is conform to the description of .Supplier~new : 
+-- The supplier iterates for the number of items contained in the values array,
+-- returning the Nil object for any nonexistent items in either array.
 ::method next
-    expose coactivity currentItem currentIndex isAvailable
-    coactivity~resume
-    drop currentItem
+    expose coactivity currentIndex currentItem isAvailable
+    isAvailable = .false
+    coactivity~resumeWithIndex
     if var("result") then do
-        currentItem = result
-        currentIndex += 1
+        currentItem = result[1]
+        currentIndex = result[2]
         isAvailable = .true
     end
-    else isAvailable = .false
-
+    else if coactivity~isAlive then do
+        currentItem = .nil
+        currentIndex = .nil -- By testing index==.nil, you know that the coactivity yielded no item
+        isAvailable = .true
+    end
+
+
+-- Two public subclasses : one for generation, one for iteration
+::class "CoactivitySupplierForGeneration" public subclass CoactivitySupplier
+::class "CoactivitySupplierForIteration" public subclass CoactivitySupplier
+