[r7647]: sandbox / jlf / samples / concurrency / coactivity.cls Maximize Restore History

Download this file

coactivity.cls    525 lines (399 with data), 19.1 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
/*
Coactivity is an emulation of coroutines in ooRexx.
This script needs a modified ooRexx interpreter, because depends on :
.context~args = <array>
.threadLocal
This is not a "real" coroutine implementation, because it's based on ooRexx activities
and synchronization. But at least you have all the functionalities of stackful
asymmetric coroutines (resume + yield). All is in place to support symmetric coroutines
(yield only), but the scheduler remains to implement.
Coroutines are a programming language concept that allows for explicit, cooperative
and stateful switching between subroutines. The advantage of real coroutine over
threads is that they do not have to be synchronized because they pass control to
each other explicitly and deterministically.
*/
--::options trace i
--::options NOMACROSPACE
--------------------------------------------------------------------------------
-- .yield[value] is more compact than .Coactivity~yield(value)
::class "yield" public
::method "[]" class unguarded
forward message ("yield") to (.Coactivity)
-- Another way to yield : call yield [value]
::routine yield public
.Coactivity~sendWith("yield", arg(1, "a"))
--------------------------------------------------------------------------------
-- This is a WeakReference which forwards the messages to its value.
-- To move elsewhere, probably (not restricted to coactivies).
::class "WeakProxy" public subclass WeakReference
::method unknown
use arg msg, args
forward to (self~value) message (msg) arguments (args)
--------------------------------------------------------------------------------
::class "Coactivity" mixinclass Object public
-- Class attributes
--::attribute globalCache class private -- contains all the active coactivities
::method init class
expose globalCache
-- Using a Directory instead of an IdentityTable because of the experimentation with WeakReferences.
globalCache = .Directory~new
::method register class
expose globalCache
use strict arg coactivityObj -- Remember : don't register the proxy ! that would forbid GC
-- Remember : A weak reference is useless here because only started-not-(ended-killed) coactivities
-- are referenced by this cache, and a started-not-(ended-killed) coactivity can't be GC'ed because
-- (by definition) its start method is running and the self variable references the coactivity.
globalCache[coactivityObj~identityHash] = coactivityObj
::method unregister class
expose globalCache
use strict arg coactivityObj
globalCache~remove(coactivityObj~identityHash)
::method count class
expose globalCache
return globalCache~items
::method all class
expose globalCache
return globalCache~allItems
::method endAll class
expose globalCache
count = 0
do coactivityObj over globalCache~allItems
if coactivityObj~end then count += 1
end
return count
::method killAll class
expose globalCache
count = 0
do coactivityObj over globalCache~allItems
if coactivityObj~kill then count += 1
end
return count
::method init
/*
The coactivity entry can be a routine, a couple (message, object) or a couple (method, object).
The default entry is the couple ("main", self).
By default, the coactivity is not started (the 'start' method is not called)
*/
expose coactivityObj
proxy = .WeakProxy~new(self)
use strict arg action="main", start=.false, object=(proxy) -- object must reference the proxy, not directly the coactivity, otherwise the coactivity will never be GC'ed
coactivityObj = .CoactivityObj~new(action, start, object, proxy) -- pass itself as proxy, to be stored on the wrapped coactivityObj (needed for supplier)
::method uninit
expose coactivityObj
coactivityObj~end
--::method unknown
-- I don't define the unknown method because (for the moment) .Coactivity is a mixin class.
-- I don't think it's a good idea to inherit an unknown method from a mixin class, knowing
-- that it's not unusual to inherit from several mixin classes.
::method executable unguarded
expose coactivityObj
forward to (coactivityObj)
::method start -- was declared unguarded because of deadlock4, but finally better to keep it guarded and declare ~yield unguarded
/*
Create the activity that will control the coactivity and make it suspended.
Use 'resume' to start the coactivity effectively.
The arguments passed to the first 'resume' will be made available to the coactivity through the traditional 'use arg'.
The arguments passed to the next 'resume's will be made available as an array returned by 'yield'.
*/
expose coactivityObj
forward to (coactivityObj)
::method main abstract
-- Default entry point of the coactivity, to be implemented in a subclass
-- (The subclassing is optional. You can use any doer as entry point).
::method yield class unguarded -- unguarded because of deadlock4
/*
Helper method to let yield from any nested invocation
(i.e. directly or indirectly called by the coactivity 'start' method).
The goal is to retrieve the coactivity instance and send it the message yield.
myCoactivity~start <--------------+
invocation |
invocation |
... |
invocation : .Coactivity~yield()
*/
coactivityObj = .threadLocal["coactivity"]
if coactivityObj == .nil then raise syntax 93.900 array ("yield can be used only from a coactivity")
forward to (coactivityObj)
::method yield private unguarded -- unguarded because of deadlock4
-- Can be called only from a coactivity.
-- Returns an array which contains the arguments passed to 'resume' by the client of the coactivity.
expose coactivityObj
forward to (coactivityObj)
::method resume -- MUST be guarded ! If unguarded then the same value could be returned to different concurrent clients
-- Remember : Was declared unguarded to fix deadlock4 but that was a bad decision. Now, yield is unguarded.
-- You can pass arguments to this method.
-- They will be passed to the coactivity, either as traditional 'use arg' if first call, or as an array returned by 'yield'.
expose coactivityObj
forward to (coactivityObj)
::method resumeWithIndex -- MUST be guarded !
-- Like ~resume, but the result is an array of (item, index)
expose coactivityObj
forward to (coactivityObj)
::method end
expose coactivityObj
forward to (coactivityObj)
::method kill
expose coactivityObj
forward to (coactivityObj)
::method isStarted unguarded
expose coactivityObj
forward to (coactivityObj)
::method isAlive unguarded
expose coactivityObj
forward to (coactivityObj)
::method isEnded unguarded
expose coactivityObj
forward to (coactivityObj)
::method isKilled unguarded
expose coactivityObj
forward to (coactivityObj)
::method statusText unguarded
expose coactivityObj
forward to (coactivityObj)
::method supplier unguarded
expose coactivityObj
forward to (coactivityObj)
::method iterator unguarded
expose coactivityObj
forward to (coactivityObj)
::method makeArray unguarded
-- This is the only way to write : do item over myCoactivity ...
-- A better approach would be to modify the interpreter to support : do item over anySupplier ...
expose coactivityObj
forward to (coactivityObj)
--------------------------------------------------------------------------------
::class "CoactivityObj"
/*
Status transitions :
notStarted --> running | ended | killed
running --> suspended | ended | killed
suspended --> running | ended | killed
*/
::constant notStarted 0
::constant suspended 1
::constant running 2
::constant ended 3
::constant killed 4
-- Instance attributes for entry point
-- ::attribute doer private
-- ::attribute object private
-- Instance attributes for execution
-- ::attribute proxy private
-- ::attribute status private
-- ::attribute arguments private
-- ::attribute yieldItem private
-- ::attribute yieldIndex private
::method init unguarded
expose doer object proxy status yieldIndex
use strict arg action, start, object, proxy
doer = action~doer
status = .CoactivityObj~notStarted
yieldIndex = 0
if start then self~start
::method uninit
-- If no longer referenced, then can stop the coactivity
-- (but will never happen, see the comment in ~register)
self~end
::method executable unguarded
expose doer
return doer
::method start
expose arguments doer object status
use strict arg -- no arg
if status <> .CoactivityObj~notStarted then return
status = .CoactivityObj~suspended
reply self
.threadLocal["coactivity"] = self
.Coactivity~register(self)
signal on any name trapCondition -- catch all
signal on syntax name trapCondition -- gives better messages
guard off
guard on when status <> .CoactivityObj~suspended
if status == .CoactivityObj~running then do
guard off
-- arguments are coming from 'resume' (the first resume activates the coactivity)
if doer~needsObject then doer~doWith(object, arguments) -- object needed (message, method)
else doer~doWith(arguments) -- no object needed (routine)
guard on
if var("result") then self~yieldLast(result)
else self~yieldLast
status = .CoactivityObj~ended
end
trapCondition:
self~kill -- maybe already killed or ended
if self~hasMethod("onTerminate") then self~onTerminate
.Coactivity~unregister(self)
if self~isKilled & condition("o") <> .nil then raise propagate
::method yield --private
expose arguments status yieldIndex yieldItem
drop yieldItem
if status == .CoactivityObj~killed then raise syntax 93.900 array ("Can't yield, the coactivity is killed") -- this is to unwind any nested invocation and return to 'start'
if status == .CoactivityObj~ended then raise syntax 93.900 array ("Can't yield, the coactivity is ended") -- this is to unwind any nested invocation and return to 'start'
if arg() <> 0 then do
use strict arg yieldItem -- yieldItem will be returned to the Coactivity's client by 'resume'
yieldIndex += 1
end
status = .CoactivityObj~suspended
guard off
guard on when status <> .CoactivityObj~suspended
if status == .CoactivityObj~killed then raise syntax 93.900 array ("The coactivity has been killed") -- this is to unwind any nested invocation and return to 'start'
if status == .CoactivityObj~ended then raise syntax 93.900 array ("The coactivity has been ended") -- this is to unwind any nested invocation and return to 'start'
-- Update the arguments of the caller's context
-- Must unwind until we reach a context whose package is not the current package.
context = .context
currentPackage = context~package
do while context <> .nil, context~package == currentPackage -- search for the first context outside this package
context = context~parentContext -- .nil if native or top-level activation.
end
if context == .nil then raise syntax 93.900 array ("Can't update the arguments, yield's context not found")
context~args = arguments -- assigns the arguments that the coactivity's client passed to 'resume'
::method yieldLast private
/*
Internal method called when the coactivity action has returned.
*/
expose yieldIndex yieldItem
drop yieldItem
if arg() <> 0 then do
use strict arg yieldItem -- yieldItem will be returned to the coactivity's client by 'resume'
yieldIndex += 1
end
::method resume
expose arguments status yieldItem
if status == .CoactivityObj~notStarted then self~start
if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
if status == .CoactivityObj~ended then return -- raise syntax 93.900 array ("Coactivity is ended")
arguments = arg(1, "a")
status = .CoactivityObj~running
guard off
guard on when status <> .CoactivityObj~running
if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
if var("yieldItem") then return yieldItem
::method resumeWithIndex
expose arguments status yieldIndex yieldItem
if status == .CoactivityObj~notStarted then self~start
if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
if status == .CoactivityObj~ended then return -- raise syntax 93.900 array ("Coactivity is ended")
arguments = arg(1, "a")
status = .CoactivityObj~running
guard off
guard on when status <> .CoactivityObj~running
if status == .CoactivityObj~killed then raise syntax 93.900 array ("Coactivity is killed")
if var("yieldItem") then return .array~of(yieldItem, yieldIndex)
::method end
expose status
if status == .CoactivityObj~ended then return .false
if status == .CoactivityObj~killed then return .false
-- A not started coactivity can be ended, to forbid its starting.
status = .CoactivityObj~ended
return .true
::method kill
expose status
if status == .CoactivityObj~ended then return .false
if status == .CoactivityObj~killed then return .false
-- A not started coactivity can be killed, to forbid its starting.
status = .CoactivityObj~killed
return .true
::method isStarted unguarded
expose status
return status <> .CoactivityObj~notStarted
::method isAlive unguarded
expose status
return status == .CoactivityObj~suspended | status == .CoactivityObj~running
::method isEnded unguarded
expose status
return status == .CoactivityObj~ended
::method isKilled unguarded
expose status
return status == .CoactivityObj~killed
::method statusText unguarded
expose status
select
when status == .CoactivityObj~notStarted then return "not started"
when status == .CoactivityObj~suspended then return "suspended"
when status == .CoactivityObj~running then return "running"
when status == .CoactivityObj~ended then return "ended"
when status == .CoactivityObj~killed then return "killed"
otherwise return "unknown"
end
::method supplier unguarded
expose proxy
return .CoactivitySupplierForGeneration~new(proxy~value) -- must pass the wrapping coactivity, not self, otherwise the coactivity may be GC'ed even if the supplier is running and not GC'ed
::method iterator unguarded
expose proxy
return .CoactivitySupplierForIteration~new(proxy~value) -- must pass the wrapping coactivity, not self, otherwise the coactivity may be GC'ed even if the supplier is running and not GC'ed
::method makeArray unguarded
-- The count parameter gives the maximal number of items in the array.
-- This is not the number of resumes, which can be greater if no result returned sometimes.
use strict arg count=(-1)
array = .Array~new
do forever
if count >=0, array~dimension(1) >= count then do
-- Better to not end the coactivity : makeArray is like clojure's take or like the pipestage .take
-- self~end
leave
end
self~resume
if var("result") then array~append(result)
-- Remember : don't append .nil when no result.
-- I don't want to get an array of thousands .nil when no result returned by a coactivity which is resumed thousands times.
if \ self~isAlive then leave
end
return array
--------------------------------------------------------------------------------
/*
Coactivity supplier :
This supplier does not take a snapshot of the items remaining to generate by the coactivity.
Instead, it calculates the next item only when the 'next' method is called.
No longer needs an uninit method to support properly this use case (the coactivity itself knows when to end automatically) :
.coactivity~new{i=0; do forever; i+=1; .yield[i]; end}~pipe(.take 5 | .console)
While the pipeline is running, the coactivity supplier is referenced by the 'source' property of the first pipe stage, and can't be GC'ed.
The pipeline itself can't be GC'ed because the first pipe stage is referenced from the call stack, as an argument of 'pipe'.
When 5 values have been taken (.take 5), the pipeline is stopped and the method 'pipe' returns.
From now, the pipeline can be GC'ed, because it's no longer referenced from the call stack.
And once the pipeline is GC'ed, there is no more reference to the coactivity supplier, which can be GC'ed.
*/
::class "CoactivitySupplier" public subclass Supplier
--::attribute coactivity private
--::attribute currentIndex private
--::attribute currentItem private
--::attribute isAvailable private
::method init
expose coactivity isAvailable
use strict arg coactivity
empty = .array~new(0) -- Lazy supplier
self~init:super(empty, empty)
--self~next -- Too early ! The first call to ~next must be done from ~available
isAvailable = -1 -- special value to indicate that ~next must be executed
::method available
expose isAvailable
if isAvailable == -1 then self~next
return isAvailable
::method index
expose currentIndex isAvailable
if isAvailable == -1 then self~next
if isAvailable then return currentIndex
::method item
expose currentItem isAvailable
if isAvailable == -1 then self~next
if isAvailable then return currentItem
-- If no result returned by the coactivity, then item=.nil and index=.nil
-- This is conform to the description of .Supplier~new :
-- The supplier iterates for the number of items contained in the values array,
-- returning the Nil object for any nonexistent items in either array.
::method next
expose coactivity currentIndex currentItem isAvailable
isAvailable = .false
coactivity~resumeWithIndex
if var("result") then do
currentItem = result[1]
currentIndex = result[2]
isAvailable = .true
end
else if coactivity~isAlive then do
currentItem = .nil
currentIndex = .nil -- By testing index==.nil, you know that the coactivity yielded no item
isAvailable = .true
end
-- Two public subclasses : one for generation, one for iteration
::class "CoactivitySupplierForGeneration" public subclass CoactivitySupplier
::class "CoactivitySupplierForIteration" public subclass CoactivitySupplier