From: <ta...@us...> - 2008-11-11 21:54:34
|
Revision: 6710 http://x10.svn.sourceforge.net/x10/?rev=6710&view=rev Author: tardieu Date: 2008-11-11 21:54:27 +0000 (Tue, 11 Nov 2008) Log Message: ----------- implemented and activated place checks. disable with -NO_PLACE_CHECKS option. Modified Paths: -------------- trunk/x10.compiler.p3/data/Async.xcd trunk/x10.compiler.p3/data/At.xcd trunk/x10.compiler.p3/data/Future.xcd trunk/x10.compiler.p3/data/Main.xcd trunk/x10.compiler.p3/data/Now.xcd trunk/x10.compiler.p3/data/ateach.xcd trunk/x10.compiler.p3/data/clock.xcd trunk/x10.compiler.p3/data/clocked-loop.xcd trunk/x10.compiler.p3/data/clocked.xcd trunk/x10.compiler.p3/data/foreach.xcd trunk/x10.compiler.p3/src/polyglot/ext/x10/visit/X10PrettyPrinterVisitor.java trunk/x10.dist/bin/x10.in trunk/x10.runtime.17/src-x10/x10/runtime/Activity.x10 trunk/x10.runtime.17/src-x10/x10/runtime/ClockState.x10 trunk/x10.runtime.17/src-x10/x10/runtime/Clock_c.x10 trunk/x10.runtime.17/src-x10/x10/runtime/FinishState.x10 trunk/x10.runtime.17/src-x10/x10/runtime/Future_c.x10 trunk/x10.runtime.17/src-x10/x10/runtime/Pool.x10 trunk/x10.runtime.17/src-x10/x10/runtime/Runtime.x10 Added Paths: ----------- trunk/x10.runtime.17/src-x10/x10/runtime/ClockPhases.x10 Removed Paths: ------------- trunk/x10.runtime.17/src-x10/x10/runtime/Clocks.x10 trunk/x10.runtime.17/src-x10/x10/runtime/Job.x10 trunk/x10.runtime.17/src-x10/x10/runtime/Worker.x10 Modified: trunk/x10.compiler.p3/data/Async.xcd =================================================================== --- trunk/x10.compiler.p3/data/Async.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/Async.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,7 +1,7 @@ // SYNOPSIS: async(#0) clocked(#1) #2 -x10.runtime.Runtime.runAsync( - new x10.runtime.Activity(#1) { - public void runX10Task() { +x10.runtime.Runtime.runAsync(#1, + new x10.core.fun.VoidFun_0_0() { + public void apply() { #2 } }, #0); Modified: trunk/x10.compiler.p3/data/At.xcd =================================================================== --- trunk/x10.compiler.p3/data/At.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/At.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,10 +1,13 @@ -// SYNOPSIS: at(#0) #1 #2=unique_id +// SYNOPSIS: at(#0) #1 #2=unique_id { x10.lang.Place tmp#2 = x10.runtime.Runtime.here(#0); - new java.lang.Runnable() { - public void run() { - #1 - } - }.run(); - x10.runtime.Runtime.here(tmp#2); + try { + new x10.core.fun.VoidFun_0_0() { + public void apply() { + #1 + } + }.apply(); + } finally { + x10.runtime.Runtime.here(tmp#2); + } } Modified: trunk/x10.compiler.p3/data/Future.xcd =================================================================== --- trunk/x10.compiler.p3/data/Future.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/Future.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,7 +1,10 @@ // SYNOPSIS: future[#1](#0) #2 #3=run-time type for #1 x10.runtime.Runtime.runFuture(#3, - new x10.runtime.Future_c<#1>(#3) { - public #1 eval() { + new x10.core.fun.Fun_0_0<#1>() { + public #1 apply() { #2 } + public x10.types.Type<#1> rtt_x10$lang$Fun_0_0_U() { + return #3; + } }, #0) Modified: trunk/x10.compiler.p3/data/Main.xcd =================================================================== --- trunk/x10.compiler.p3/data/Main.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/Main.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -7,9 +7,17 @@ public void main(final x10.core.Rail<java.lang.String> args) { x10.runtime.Runtime.start( - new x10.runtime.Activity() { - public void runX10Task() throws java.lang.Throwable { - #2.main(args); + new x10.core.fun.VoidFun_0_0() { + public void apply() { + try { + #2.main(args); + } catch (java.lang.RuntimeException e) { + throw e; + } catch (java.lang.Error e) { + throw e; + } catch (java.lang.Throwable t) { + throw new x10.lang.MultipleExceptions(t); + } } }); } Modified: trunk/x10.compiler.p3/data/Now.xcd =================================================================== --- trunk/x10.compiler.p3/data/Now.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/Now.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,7 +1,7 @@ // SYNOPSIS: now(#0) #1 -x10.runtime.Runtime.runNow( - new x10.runtime.Activity() { - public void runX10Task() { - #1 +x10.runtime.Runtime.runNow(#0, + new x10.core.fun.VoidFun_0_0() { + public void apply() { + #2 } - }, #0); + }); Modified: trunk/x10.compiler.p3/data/ateach.xcd =================================================================== --- trunk/x10.compiler.p3/data/ateach.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/ateach.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -4,9 +4,9 @@ for (x10.core.Iterator<#7> #2__ = #2__distCopy.iterator(); #2__.hasNext(); ) { #0 #1 #2 = #2__.next(); #6 - x10.runtime.Runtime.runAsync( - new x10.runtime.Activity(#5) { - public void runX10Task() { + x10.runtime.Runtime.runAsync(#5, + new x10.core.fun.VoidFun_0_0() { + public void apply() { #4 } }, #2__distCopy.apply(#2)); Modified: trunk/x10.compiler.p3/data/clock.xcd =================================================================== --- trunk/x10.compiler.p3/data/clock.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/clock.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,2 +1,2 @@ // SYNOPSIS: #0=clock -(x10.runtime.Clock_c) #0 \ No newline at end of file +// NOT USED!!! \ No newline at end of file Modified: trunk/x10.compiler.p3/data/clocked-loop.xcd =================================================================== --- trunk/x10.compiler.p3/data/clocked-loop.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/clocked-loop.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,2 +1,2 @@ // SYNOPSIS: #0=clock #1=unique_id -add((x10.runtime.Clock_c) #0); +(x10.runtime.Clock_c) #0, \ No newline at end of file Modified: trunk/x10.compiler.p3/data/clocked.xcd =================================================================== --- trunk/x10.compiler.p3/data/clocked.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/clocked.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,4 +1,3 @@ // SYNOPSIS: clocked(#0) #1=unique_id -x10.core.RailFactory.<x10.runtime.Clock_c>makeRailFromJavaArray(new java.util.LinkedList() {{ - #0 -}}.toArray()) +x10.core.RailFactory.<x10.runtime.Clock_c>makeValRailFromJavaArray( + new x10.runtime.Clock_c[] { #0 }) Modified: trunk/x10.compiler.p3/data/foreach.xcd =================================================================== --- trunk/x10.compiler.p3/data/foreach.xcd 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/data/foreach.xcd 2008-11-11 21:54:27 UTC (rev 6710) @@ -2,9 +2,9 @@ for (x10.core.Iterator<#7> #2__ = (#3).iterator(); #2__.hasNext(); ) { #0 #1 #2 = #2__.next(); #6 - x10.runtime.Runtime.runAsync( - new x10.runtime.Activity(#5) { - public void runX10Task() { + x10.runtime.Runtime.runAsync(#5, + new x10.core.fun.VoidFun_0_0() { + public void apply() { #4 } }, x10.runtime.Runtime.here()); Modified: trunk/x10.compiler.p3/src/polyglot/ext/x10/visit/X10PrettyPrinterVisitor.java =================================================================== --- trunk/x10.compiler.p3/src/polyglot/ext/x10/visit/X10PrettyPrinterVisitor.java 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.compiler.p3/src/polyglot/ext/x10/visit/X10PrettyPrinterVisitor.java 2008-11-11 21:54:27 UTC (rev 6710) @@ -2395,16 +2395,16 @@ private Template processClocks(Clocked c) { assert (null != c.clocks()); Template clocks = null; - if (c.clocks().isEmpty()) - clocks = null; - else if (c.clocks().size() == 1) - clocks = new Template("clock", c.clocks().get(0)); - else { +// if (c.clocks().isEmpty()) +// clocks = null; +// else if (c.clocks().size() == 1) +// clocks = new Template("clock", c.clocks().get(0)); +// else { Integer id = getUniqueId_(); clocks = new Template("clocked", new Loop("clocked-loop", c.clocks(), new CircularList(id)), id); - } +// } return clocks; } Modified: trunk/x10.dist/bin/x10.in =================================================================== --- trunk/x10.dist/bin/x10.in 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.dist/bin/x10.in 2008-11-11 21:54:27 UTC (rev 6710) @@ -26,7 +26,8 @@ -config) shift; config="$1.cfg";; -dev) dev="true";; -J*) java_args="${java_args} '${1##-J}'";; - -*) java_args="${java_args} -Dx10.${1##-}";; + -*=*) java_args="${java_args} -Dx10.${1##-}";; + -*) java_args="${java_args} -Dx10.${1##-}=true";; *.x10) args="$args '${1%%.x10}\$Main'";; *) args="$args '$1\$Main'"; shift; args="$args $*"; break;; esac Modified: trunk/x10.runtime.17/src-x10/x10/runtime/Activity.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Activity.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Activity.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -11,152 +11,37 @@ import x10.util.Stack; /** - * The representation of an X10 async activity. - * Note that an Activity object is created by a different thread than the one that executes it. - * @author Christian Grothoff, Christoph von Praun, vj - * @author Raj Barik, Vivek Sarkar * @author tardieu */ -public abstract class Activity(name: String) { - /** - * The FinishState of this activity. - */ - private var finishState: FinishState; - - /** - * The finishStack is lazily created. - */ - private var finishStack: Stack[FinishState]; - - /** - * The clock phases of this activity. Lazily created. - */ - private var clocks: Clocks; +class Activity(clockPhases:ClockPhases, finishStack:Stack[FinishState]) { + private val body:()=>Void; /** - * The clocks of the current activity. - */ - def clocks(): Clocks { - if (null == clocks) clocks = new Clocks(); - return clocks; - } - - /** - * Next statement = next on all clocks in parallel. - */ - def next(): void { - if (null != clocks) clocks.next(); - } - - // public constructors - - /** * Create an activity. */ - public def this(name: String) { - property(name); + def this(body:()=>Void) { + property(new ClockPhases(), new Stack[FinishState]()); + this.body = body; } - public def this() { - this(""); - } - /** - * Create an activity with the given rail of clocks. + * Create a clocked activity. */ - public def this(rail: Rail[Clock_c], name: String) { - this(name); - clocks = new Clocks(); - clocks.register(rail); + def this(body:()=>Void, clocks:ValRail[Clock_c], phases:ValRail[Int]) { + this(body); + clockPhases.register(clocks, phases); } - public def this(rail: Rail[Clock_c]) { - this(rail, ""); - } - /** - * Create an activity with the given clock. + * Run the activity. */ - public def this(clock: Clock_c, name: String) { - this(name); - clocks = new Clocks(); - clocks.register(clock); - } - - public def this(clock: Clock_c) { - this(clock, ""); - } - - // runnable - - /** - * Implemented by X10 activities, actual activity "user" code - * generated by xcd templates. - */ - public abstract def runX10Task(): void throws Throwable; - - /** - * Default implementation of the Runnable interface. - * An Activity executing in a place should always be invoked using it's runnable interface, - * and not directly by calling it's runX10Task method; which is an abstract method in this class. - * This run method allows performing actions before and after activity execution allowing - * to submit the activity safely as regard to runtime and pool thread. - */ - def run(): void { + def run():Void { try { - runX10Task(); - } catch (t: Throwable) { - pushException(t); + body(); + } catch (t:Throwable) { + finishStack.peek().pushException(t); } - if (null != clocks) clocks.drop(); - finishState.notifySubActivityTermination(); + clockPhases.drop(); + finishStack.peek().notifySubActivityTermination(); } - - // finish state - - /** - * Return the finish state of the current activity - */ - def finishState(): FinishState { - return finishState; - } - - /** - * Set the root finish state of the current activity - * Notify the finish state of a spawned activity - */ - def finishState(state: FinishState): void { - finishState = state; - state.notifySubActivitySpawn(); - } - - /** - * Start executing this activity synchronously - * (i.e. within a finish statement). - */ - def startFinish(): void { - if (null == finishStack) finishStack = new Stack[FinishState](); - finishStack.push(finishState); - finishState = new FinishState(); - } - - /** - * Suspend until all activities spawned during this finish - * operation have terminated. Throw an exception if any - * async terminated abruptly. Otherwise continue normally. - * Should only be called by the thread executing the current activity. - */ - def stopFinish(): void { - val state = finishState; - finishState = finishStack.pop(); - state.waitForFinish(); - } - - /** - * Push the exception thrown while executing s in a finish s, - * onto the finish state. - */ - def pushException(t: Throwable): void { - finishState.pushException(t); - } } Copied: trunk/x10.runtime.17/src-x10/x10/runtime/ClockPhases.x10 (from rev 6709, trunk/x10.runtime.17/src-x10/x10/runtime/Clocks.x10) =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/ClockPhases.x10 (rev 0) +++ trunk/x10.runtime.17/src-x10/x10/runtime/ClockPhases.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -0,0 +1,33 @@ +/* + * + * (C) Copyright IBM Corporation 2006-2008. + * + * This file is part of X10 Language. + * + */ + +package x10.runtime; + +import x10.util.HashMap; + +/** + * @author tardieu + */ +class ClockPhases extends HashMap[Clock_c,Int] { + def register(clock:Clock_c, phase:Int):Void { + clock.register_c(this, phase); + } + + def register(clocks:ValRail[Clock_c], phases:ValRail[Int]):Void { + for(var i:Int = 0; i < clocks.length; i++) register(clocks(i), phases(i)); + } + + def next():Void { + for(clock:Clock_c in keySet()) clock.resume_c(); + for(clock:Clock_c in keySet()) clock.next_c(); + } + + def drop():Void { + for(clock:Clock_c in keySet()) clock.drop_c(); + } +} Modified: trunk/x10.runtime.17/src-x10/x10/runtime/ClockState.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/ClockState.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/ClockState.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -12,26 +12,26 @@ * @author tardieu */ class ClockState { - static val FIRST_PHASE = 1; + const FIRST_PHASE = 1; - private var count: int = 1; - private var alive: int = 1; - private var phase: int = FIRST_PHASE; + private var count:Int = 1; + private var alive:Int = 1; + private var phase:Int = FIRST_PHASE; - atomic def register(ph: int): void { + atomic def register(ph:Int):Void { ++count; if (-ph != phase) ++alive; } - atomic def resume(): void { + atomic def resume():Void { if (--alive == 0) { alive = count; ++phase; } } - def next(ph: int): void { - val abs: int; + def next(ph:Int):Void { + val abs:Int; if (ph < 0) { abs = -ph; } else { @@ -41,7 +41,7 @@ await (abs < phase); } - atomic def drop(ph: int): void { + atomic def drop(ph:Int):Void { --count; if (-ph != phase) resume(); } Modified: trunk/x10.runtime.17/src-x10/x10/runtime/Clock_c.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Clock_c.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Clock_c.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -14,77 +14,71 @@ public value Clock_c extends Clock { private val state = new ClockState(); - private static def abs(z: int): int { - return (z<0) ? -z : z; - } + private static def abs(z:Int):Int = z < 0 ? -z : z; - public def this(name: String) { + public def this(name:String) { super(name); - Runtime.clocks().put(this, ClockState.FIRST_PHASE); + Runtime.clockPhases().put(this, ClockState.FIRST_PHASE); } - public def registered(): boolean { - return Runtime.clocks().containsKey(this); - } + public def registered():boolean = Runtime.clockPhases().containsKey(this); - public def dropped(): boolean { - return !registered(); - } + public def dropped():boolean = !registered(); - public def resume(): void { + public def resume():Void { if (dropped()) throw new ClockUseException(); val ph = ph_c(); if (ph < 0) throw new ClockUseException(); finish async (state) state.resume(); - Runtime.clocks().put(this, -ph); + Runtime.clockPhases().put(this, -ph); } - public def next(): void { + public def next():Void { if (dropped()) throw new ClockUseException(); next_c(); } - public def phase(): int { - if (dropped()) throw new ClockUseException(); - return abs(ph_c()); - } + public def phase():Int = abs(phase_c()); - public def drop(): void { + public def drop():Void { if (dropped()) throw new ClockUseException(); - val ph = Runtime.clocks().remove(this) to Int; + val ph = Runtime.clockPhases().remove(this) to Int; async (state) state.drop(ph); } - public def hashCode(): int { + public def hashCode():Int { return state.hashCode(); } - def register_c(clocks: Clocks): void { - if (dropped()) throw new ClockUseException(); - val ph = ph_c(); + def register_c(clockPhases:ClockPhases, ph:Int):Void { finish async (state) state.register(ph); - clocks.put(this, ph); + clockPhases.put(this, ph); } - def resume_c(): void { + def resume_c():Void { val ph = ph_c(); if (ph < 0) return; finish async (state) state.resume(); - Runtime.clocks().put(this, -ph); + Runtime.clockPhases().put(this, -ph); } - def next_c(): void { + def next_c():Void { val ph = ph_c(); finish async (state) state.next(ph); - Runtime.clocks().put(this, abs(ph) + 1); + Runtime.clockPhases().put(this, abs(ph) + 1); } - def drop_c(): void { + def phase_c():Int { + if (dropped()) throw new ClockUseException(); + return ph_c(); + } + + def drop_c():Void { val ph = ph_c(); async (state) state.drop(ph); } - def ph_c(): int { - return Runtime.clocks()(this) to Int; + def ph_c():Int { + return Runtime.clockPhases()(this) to Int; } } Deleted: trunk/x10.runtime.17/src-x10/x10/runtime/Clocks.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Clocks.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Clocks.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,33 +0,0 @@ -/* - * - * (C) Copyright IBM Corporation 2006-2008. - * - * This file is part of X10 Language. - * - */ - -package x10.runtime; - -import x10.util.HashMap; - -/** - * @author tardieu - */ -class Clocks extends HashMap[Clock_c,Int] { - def register(clock: Clock_c): void { - clock.register_c(this); - } - - def register(rail: Rail[Clock_c]): void { - for(clock: Clock_c in rail) register(clock); - } - - def next(): void { - for(clock: Clock_c in keySet()) clock.resume_c(); - for(clock: Clock_c in keySet()) clock.next_c(); - } - - def drop(): void { - for(clock: Clock_c in keySet()) clock.drop_c(); - } -} Modified: trunk/x10.runtime.17/src-x10/x10/runtime/FinishState.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/FinishState.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/FinishState.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -19,16 +19,19 @@ * @author Raj Barik, Vivek Sarkar * @author tardieu */ -class FinishState { - +value FinishState { /** * The Exception Stack is used to collect exceptions * issued when activities associated with this finish state terminate abruptly. - * This Object is lazily created */ - private var exceptions: Stack[Throwable]; + private val exceptions = new Stack[Throwable](); /** + * The monitor is used to serialize insertions into the Exception Stack. + */ + private val monitor = new Monitor(); + + /** * Keep track of current number of activities associated with this finish state */ private val latch = new ModCountDownLatch(0); @@ -37,9 +40,9 @@ * This method returns only when all spawned activity registered with this * FinishState have terminated either normally or abruptly. */ - def waitForFinish(): void { + def waitForFinish():Void { latch.await(); - if ((null != exceptions) && !exceptions.isEmpty()) { + if (!exceptions.isEmpty()) { if (exceptions.size() == 1) { val t = exceptions.pop(); if (t instanceof Error) { @@ -58,23 +61,26 @@ * An activity created under this finish has been created. Increment the count * associated with the finish. */ - def notifySubActivitySpawn(): void { - latch.updateCount(); + def notifySubActivitySpawn():Void { + at (latch) latch.updateCount(); } /** * An activity created under this finish has terminated. Decrement the count * associated with the finish and notify the parent activity if it is waiting. */ - def notifySubActivityTermination(): void { - latch.countDown(); + def notifySubActivityTermination():Void { + at (latch) latch.countDown(); } /** * Push an exception onto the stack. */ - atomic def pushException(t: Throwable): void { - if (null == exceptions) exceptions = new Stack[Throwable](); - exceptions.push(t); + def pushException(t:Throwable):Void { + at (exceptions) { + monitor.lock(); + exceptions.push(t); + monitor.unlock(); + } } } Modified: trunk/x10.runtime.17/src-x10/x10/runtime/Future_c.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Future_c.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Future_c.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -8,68 +8,68 @@ package x10.runtime; +import x10.util.Stack; + /** - * This class encapsulates the return value of a local async - * call and allows the client to wait for the completion of the - * async call (force future). - * @author Christian Grothoff - * @author Christoph von Praun - * @author vj - * @author Raj Barik, Vivek Sarkar + * The representation of an X10 future expression. * @author tardieu */ -public abstract class Future_c[T] extends Activity implements Future[T] { +public value Future_c[T] implements Future[T] { /** + * CountDownLatch for signaling and wait -- can be replaced by a boolean latch + */ + private val cdl = new ModCountDownLatch(1); + + /** * Set if the activity terminated with an exception. * Can only be of type Error or RuntimeException - * (since X10 only has unchecked exceptions). */ - private var exception: Box[Throwable]; + private val exception = new Stack[Throwable](); + + private val result:Rail[T]; + + private val eval:()=>T; + + public def this(eval:()=>T) { + this.eval = eval; + result = Rail.makeVar[T](1); + } + + public def forced():boolean = Runtime.remote[boolean](cdl, ()=>forced_c()); + + private def forced_c():boolean = cdl.getCount() == 0; - private var result: T; + public def apply():T = force(); - /** - * CountDownLatch for signaling and wait -- can be replaced by a boolean latch - */ - private val cdl = new ModCountDownLatch(1); + public def force():T = Runtime.remote[T](cdl, ()=>force_c()); - public def apply(): T { - return force(); - } - - public def force(): T { + private def force_c():T { cdl.await(); - if (exception !=null) { - val e = exception to Throwable; + if (!exception.isEmpty()) { + val e = exception.peek(); if (e instanceof Error) throw e as Error; if (e instanceof RuntimeException) throw e as RuntimeException; assert false as boolean; } - return result; + return result(0); } - public def forced(): boolean { - return cdl.getCount() == 0; - } - - public abstract def eval(): T; - - public def runX10Task(): void { + def run():Void { try { - startFinish(); + Runtime.startFinish(); try { - result = eval(); - } catch (t: Throwable) { - pushException(t); + result(0) = eval(); + } catch (t:Throwable) { + Runtime.pushException(t); } - stopFinish(); - } catch (t: Throwable) { + Runtime.stopFinish(); + } catch (t:Throwable) { // Now nested asyncs have terminated. - exception = t to Box[Throwable]; + exception.push(t); } finally { cdl.countDown(); - } - } + } + } } Deleted: trunk/x10.runtime.17/src-x10/x10/runtime/Job.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Job.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Job.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,17 +0,0 @@ -/* - * - * (C) Copyright IBM Corporation 2006-2008. - * - * This file is part of X10 Language. - * - */ - -package x10.runtime; - -/** - * Job submitted to thread pool - * @author tardieu - */ -value Job(activity: Activity, place: Place) { - def this(activity: Activity, place: Place) = property(activity, place); -} Modified: trunk/x10.runtime.17/src-x10/x10/runtime/Pool.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Pool.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Pool.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -8,82 +8,66 @@ package x10.runtime; +import x10.runtime.kernel.Runnable; import x10.runtime.kernel.Thread; import x10.util.Stack; /** - * Thread pool * @author tardieu */ -class Pool { - // TODO convert to value type - +class Pool implements Runnable { /** * Instance lock */ private val monitor = new Monitor(); /** - * Jobs not yet assigned to a worker + * Activities not yet assigned to a worker */ - private val jobs = new Stack[Job](); + private val activities = new Stack[Activity](); /** * Pool size */ - private var count: int; + private var count:Int; /** * Number of blocked activities in the pool */ - private var busy: int = 0; + private var busy:Int = 0; /** * Start count threads */ - def this(count: nat) { + def this(count:Int) { this.count = count; - for(var i: int = 0; i<count; i++) allocate(i); + for(var i:Int = 0; i < count; i++) allocate(i); } /** - * Submit a new job to the pool + * Submit a new activity to the pool */ - def execute(job: Job): void { + def execute(activity:Activity):Void { monitor.lock(); - jobs.push(job); + activities.push(activity); - // wake up available worker thread if any + // wake up available worker if any monitor.unpark(); monitor.unlock(); } - + /** - * Assign a job to calling worker - */ - def job(thread: Thread): Job { - monitor.lock(); - - // park worker thread - while (jobs.isEmpty()) monitor.park(); - - val job = jobs.pop(); - monitor.unlock(); - return job; - } - - /** * Start a new thread */ - def allocate(id: int): void { - new Thread(Place.FIRST_PLACE, new Worker(this), "thread-" + id.toString()).start(); + def allocate(id:Int):Void { + new Thread(location, this, "thread-" + id.toString()).start(); } /** * Increment number of blocked activities */ - def increase(): void { + def increase():Void { monitor.lock(); if (++busy >= count) allocate(count++); monitor.unlock(); @@ -92,9 +76,35 @@ /** * Decrement number of blocked activities */ - def decrease(): void { + def decrease():Void { monitor.lock(); --busy; monitor.unlock(); } + + /** + * Worker body + */ + public def run():Void { + val thread = Thread.currentThread(); + + // no need for termination condition + // termination of main activity governs program termination + while (true) { + monitor.lock(); + + // park worker until one activity can be executed + while (activities.isEmpty()) monitor.park(); + + // pop activity + val activity = activities.pop(); + monitor.unlock(); + + // attach thread to activity + thread.activity(activity); + + // run activity + at (activity) activity.run(); + } + } } Modified: trunk/x10.runtime.17/src-x10/x10/runtime/Runtime.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Runtime.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Runtime.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -9,113 +9,118 @@ package x10.runtime; import x10.runtime.kernel.InterruptedException; -import x10.runtime.kernel.Lock; -import x10.runtime.kernel.Runnable; import x10.runtime.kernel.Thread; /** * @author tardieu */ public value Runtime { - // TODO place check/place-cast of null? + // TODO place-cast of null? // TODO runNow // TODO configurable pools /** * One monitor per place */ - private const monitors = Rail.makeVal[Monitor](Place.MAX_PLACES, (id:nat)=>new Monitor()); - + private const monitors = Rail.makeVal[Monitor](Place.MAX_PLACES, + (id:Nat)=>Runtime.remote[Monitor](Place.place(id), ()=>new Monitor())); + /** * The common thread pool */ private const pool = new Pool(Int.getInteger("x10.INIT_THREADS_PER_PLACE", 3)); +// private const pools = Rail.makeVal[Pool](Place.MAX_PLACES, +// (id:Nat)=>Runtime.remote[Pool](Place.place(id), ()=>new Pool(Int.getInteger("x10.INIT_THREADS_PER_PLACE", 3)))); + /** * Notify the thread pool that one activity is about to block */ - static def threadBlockedNotification(): void { - pool.increase(); + static def threadBlockedNotification():Void { + at (Place.FIRST_PLACE) pool.increase(); } /** * Notify the thread pool that one activity has unblocked */ - static def threadUnblockedNotification(): void { - pool.decrease(); + static def threadUnblockedNotification():Void { + at (Place.FIRST_PLACE) pool.decrease(); } /** * Return the current activity */ - private static def current(): Activity { - return Thread.currentThread().activity() as Activity; - } + static def current():Activity = Thread.currentThread().activity() as Activity; + + /** + * Return the clock phases of the current activity + */ + static def clockPhases():ClockPhases = current().clockPhases(); /** - * Return the place of the current activity + * Return the current place */ - public static def here(): Place { - return Thread.currentThread().place() as Place; - } + public static def here():Place = Thread.currentThread().place() as Place; /** - * Set the place of the current activity - * Return the old place + * Set the current place + * Return the former place */ - public static def here(place: Place): Place { + public static def here(place:Place):Place { val thread = Thread.currentThread(); - val h = thread.place() as Place; + val p = thread.place() as Place; thread.place(place); - return h; + return p; } /** - * Return the clock phases of the current activity - */ - static def clocks(): Clocks { - return current().clocks(); - } - - /** * Run the main activity in a finish */ - public static def start(activity: Activity): void { - Thread.currentThread().activity(activity); - val state = new FinishState(); - activity.finishState(state); - activity.run(); - state.waitForFinish(); + public static def start(body:()=>Void):Void { + try { + val activity = new Activity(body); + Thread.currentThread().activity(activity); + val state = new FinishState(); + state.notifySubActivitySpawn(); + activity.finishStack.push(state); + activity.run(); + state.waitForFinish(); + } catch (t:Throwable) { + t.printStackTrace(); + } } /** * Run an async */ - public static def runAsync(activity: Activity, o: Object): void { - activity.finishState(current().finishState()); - val place = o instanceof Place ? o as Place : location(o); - pool.execute(new Job(activity, place)); + public static def runAsync(clocks:ValRail[Clock_c], body:()=>Void, place:Place):Void { + at (current()) { + val state = current().finishStack.peek(); + val phases = Rail.makeVal[Int](clocks.length, (i:Nat)=>clocks(i).phase_c()); + state.notifySubActivitySpawn(); + at (place) { + val activity = new Activity(body, clocks, phases); + activity.finishStack.push(state); + at (Place.FIRST_PLACE) pool.execute(activity); + } + } } /** * Run a future */ - public static def runFuture[T](future_c: Future_c[T], o: Object): Future[T] { - runAsync(future_c, o); - return future_c; + public static def runFuture[T](eval:()=>T, place:Place):Future[T] { + val futur = Runtime.remote[Future_c[T]](place, ()=>new Future_c[T](eval)); + runAsync(Rail.makeVal[Clock_c](0), ()=>futur.run(), place); + return futur; } - /** - * Now - */ - public static def runNow(activity: Activity, o: Object): void { - throw new RuntimeException("now not implemented"); - } + const PLACE_CHECKS = !Boolean.getBoolean("x10.NO_PLACE_CHECKS"); /** * Compute location */ - private static def location(o: Object): Place { + private static def location(o:Object):Place { if (o instanceof Ref) return (o as Ref).location; return here; } @@ -123,17 +128,30 @@ /** * Place check */ - public static def placeCheck(p: Place, o: Object): Object { -// if (null != o && location(o) != p) { -// throw new BadPlaceException("object=" + o + " access at place=" + p); -// } + public static def placeCheck(p:Place, o:Object):Object { + if (PLACE_CHECKS && null != o && location(o) != p) { + throw new BadPlaceException("object=" + o + " access at place=" + p); + } return o; } /** + * Remote computation + */ + static def remote[T](location:Object, eval:()=>T):T { + val ret = here; + val box = Rail.makeVar[T](1); + at (location instanceof Place ? location as Place :(location as Ref).location) { + val result = eval(); + at (ret) box(0) = result; + } + return box(0); + } + + /** * Lock current place */ - public static def lock(): void { + public static def lock():Void { monitors(here().id).lock(); } @@ -141,7 +159,7 @@ * Wait on current place lock * Must be called while holding the place lock */ - public static def await(): void { + public static def await():Void { monitors(here().id).await(); } @@ -149,9 +167,9 @@ * Unlock current place * Notify all */ - public static def release(): void { - monitors(here().id).unparkAll(); - monitors(here().id).unlock(); + public static def release():Void { + monitors(here().id).unparkAll(); + monitors(here().id).unlock(); } /** @@ -160,12 +178,12 @@ * @param millis the number of milliseconds to sleep * @return true if completed normally, false if interrupted */ - public static def sleep(millis: long): boolean { + public static def sleep(millis:long):Boolean { try { threadBlockedNotification(); Thread.sleep(millis); return true; - } catch (e: InterruptedException) { + } catch (e:InterruptedException) { return false; } finally { threadUnblockedNotification(); @@ -175,16 +193,16 @@ /** * Next statement = next on all clocks in parallel. */ - public static def next(): void { - current().next(); + public static def next():Void { + at (current()) current().clockPhases.next(); } /** * Start executing current activity synchronously * (i.e. within a finish statement). */ - public static def startFinish(): void { - current().startFinish(); + public static def startFinish():Void { + at (current()) current().finishStack.push(new FinishState()); } /** @@ -193,15 +211,15 @@ * async terminated abruptly. Otherwise continue normally. * Should only be called by the thread executing the current activity. */ - public static def stopFinish(): void { - current().stopFinish(); + public static def stopFinish():Void { + at (current()) current().finishStack.pop().waitForFinish(); } /** * Push the exception thrown while executing s in a finish s, * onto the finish state. */ - public static def pushException(t: Throwable): void { - current().pushException(t); + public static def pushException(t:Throwable):Void { + at (current()) current().finishStack.peek().pushException(t); } } Deleted: trunk/x10.runtime.17/src-x10/x10/runtime/Worker.x10 =================================================================== --- trunk/x10.runtime.17/src-x10/x10/runtime/Worker.x10 2008-11-11 15:47:09 UTC (rev 6709) +++ trunk/x10.runtime.17/src-x10/x10/runtime/Worker.x10 2008-11-11 21:54:27 UTC (rev 6710) @@ -1,42 +0,0 @@ -/* - * - * (C) Copyright IBM Corporation 2006-2008. - * - * This file is part of X10 Language. - * - */ - -package x10.runtime; - -import x10.runtime.kernel.Runnable; -import x10.runtime.kernel.Thread; - -/** - * Worker thread in thread pool - * @author tardieu - */ -value Worker(pool: Pool) implements Runnable { - def this(pool: Pool) = property(pool); - - /** - * Main loop - */ - public def run(): void { - val thread = Thread.currentThread(); - - // no need for termination condition - // termination of main activity governs program termination - while (true) { - - // request new job from pool - val job = pool.job(thread); - - // attach thread to place and activity - thread.place(job.place); - thread.activity(job.activity); - - // run activity - job.activity.run(); - } - } -} This was sent by the SourceForge.net collaborative development platform, the world's largest Open Source development site. |