|
From: Wesley T. <we...@ml...> - 2006-12-18 18:52:58
|
my collection of SML libs, only half finished mostly ---------------------------------------------------------------------- A mltonlib/trunk/ca/terpstra/st/ A mltonlib/trunk/ca/terpstra/st/Makefile A mltonlib/trunk/ca/terpstra/st/README A mltonlib/trunk/ca/terpstra/st/data.sig A mltonlib/trunk/ca/terpstra/st/data.sml A mltonlib/trunk/ca/terpstra/st/edge.fun A mltonlib/trunk/ca/terpstra/st/epoll.h A mltonlib/trunk/ca/terpstra/st/epoll.sig A mltonlib/trunk/ca/terpstra/st/epoll.sml A mltonlib/trunk/ca/terpstra/st/ioevent.sig A mltonlib/trunk/ca/terpstra/st/ioevent.sml A mltonlib/trunk/ca/terpstra/st/kevent.h A mltonlib/trunk/ca/terpstra/st/kqueue.sml A mltonlib/trunk/ca/terpstra/st/level.fun A mltonlib/trunk/ca/terpstra/st/lpoll.sig A mltonlib/trunk/ca/terpstra/st/open.sml A mltonlib/trunk/ca/terpstra/st/scheduler.sig A mltonlib/trunk/ca/terpstra/st/socket.sml A mltonlib/trunk/ca/terpstra/st/st.mlb A mltonlib/trunk/ca/terpstra/st/state.sig A mltonlib/trunk/ca/terpstra/st/state.sml A mltonlib/trunk/ca/terpstra/st/test.mlb A mltonlib/trunk/ca/terpstra/st/test.sml A mltonlib/trunk/ca/terpstra/st/thread.sig A mltonlib/trunk/ca/terpstra/st/thread.sml A mltonlib/trunk/ca/terpstra/st/timeout.sig A mltonlib/trunk/ca/terpstra/st/timeout.sml ---------------------------------------------------------------------- Added: mltonlib/trunk/ca/terpstra/st/Makefile =================================================================== --- mltonlib/trunk/ca/terpstra/st/Makefile 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/Makefile 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,9 @@ +all: st + +epoll/epoll.mlb: epoll.h /usr/include/x86_64-linux/i386-linux/sys/epoll.h +kevent/kevent.mlb: kevent.h /usr/include/sys/event.h + +%.mlb: + mlnlffigen -allSU true -linkage static -dir $(@D) -mlbfile $(@F) $^ + +-include $(patsubst %.mlb,%.dep,$(wildcard *.mlb)) Added: mltonlib/trunk/ca/terpstra/st/README =================================================================== --- mltonlib/trunk/ca/terpstra/st/README 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/README 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,16 @@ +This is a simple work-alike of state-threads.sf.net for Standard ML. +It helps in building event driven state machines with non-concurrent threads. + +For an example, see test.sml + +To use on osx: + make kevent/kevent.mlb + mlton test.mlb + +To use on linux: + edit st.mlb to use epoll.mlb instead of kevent.mlb + make epoll/epoll.mlb + mlton test.mlb + +The test program downloads two webpages from google concurrently, while +answering TCP connections on port 12467 and printing a heart beat. Added: mltonlib/trunk/ca/terpstra/st/data.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/data.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/data.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,46 @@ +signature SPARSE_ARRAY = + sig + type 'a sparse_array + + val new: unit -> 'a sparse_array + + val sub: 'a sparse_array * int -> 'a option + val update: 'a sparse_array * int * 'a -> unit + val erase: 'a sparse_array * int -> unit + end + +signature DYNAMIC_ARRAY = + sig + type 'a dynamic_array + + val new: unit -> 'a dynamic_array + val size: 'a dynamic_array -> int + + val sub: 'a dynamic_array * int -> 'a + val update: 'a dynamic_array * int * 'a -> unit + val swap: 'a dynamic_array * int * int -> unit + + val push: 'a dynamic_array * 'a -> unit + val pop: 'a dynamic_array -> unit + end + +signature HEAP = + sig + type 'a heap + val new: ('a * 'a -> bool) -> 'a heap + val push: 'a heap * 'a -> unit + val pop: 'a heap -> unit + val peek: 'a heap -> 'a option + end + +signature QUEUE = + sig + type 'a queue + val new: unit -> 'a queue + + val empty: 'a queue -> bool + val enque: 'a queue * 'a -> unit + val deque: 'a queue -> 'a option + +(* val enqueList: 'a queue * 'a list -> unit *) + end Added: mltonlib/trunk/ca/terpstra/st/data.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/data.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/data.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,136 @@ +structure SparseArray :> SPARSE_ARRAY = + struct + type 'a sparse_array = 'a option array ref + + fun new () = ref (Array.array (8, NONE)) + + fun sub (ref array, i) = + if i >= (Array.length array) then NONE else + Array.sub (array, i) + + fun update (array, i, x) = ( + if i < Array.length (!array) then () else + let val a = Array.array (i*2 + 1, NONE) + in + Array.copy { src = !array, dst = a, di = 0 }; + array := a + end; + Array.update (!array, i, SOME x)) + + fun erase (ref array, i) = + if i >= (Array.length array) then () else + Array.update (array, i, NONE) + end + +structure DynamicArray :> DYNAMIC_ARRAY = + struct + type 'a dynamic_array = 'a option array ref * int ref + + fun new () = (ref (Array.array (8, NONE)), ref 0) + fun size (_, ref length) = length + + fun sub ((ref array, _), i) = valOf (Array.sub (array, i)) + fun update ((ref array, _), i, x) = Array.update (array, i, SOME x) + + fun swap ((ref array, _), i, j) = + let + val iv = Array.sub (array, i) + val jv = Array.sub (array, j) + in + Array.update (array, i, jv); + Array.update (array, j, iv) + end + + fun push ((array, length), x) = ( + if Array.length (!array) > !length then () else + let val a = Array.array (!length * 2, NONE) + in + Array.copy { src = !array, dst = a, di = 0 }; + array := a + end; + update ((array, length), !length, x); + length := !length + 1) + + fun pop (ref array, length) = ( + length := !length - 1; + Array.update (array, !length, NONE)) + end + +structure Heap :> HEAP = + struct + open DynamicArray + type 'a heap = 'a dynamic_array * ('a * 'a -> bool) + + fun left i = 2*i + 1 + fun right i = 2*i + 2 + fun parent i = (i - 1) div 2 + + fun new cmp = (DynamicArray.new (), cmp) + + fun push ((a, cmp), x) = + let + fun fixtail 0 = () | fixtail i = + let + val parent = parent i + in + if cmp (sub (a, parent), sub (a, i)) then () else + (swap (a, parent, i); fixtail parent) + end + in + DynamicArray.push (a, x); + fixtail (size a - 1) + end + + fun pop (a, cmp) = + let + val newsize = size a - 1 + + fun fixhead i = + let + val left = left i + val right = right i + in + if left >= newsize then () else + if right >= newsize then + if cmp (sub (a, i), sub (a, left)) then () else + swap (a, i, left) + else + if cmp (sub (a, left), sub (a, right)) then + if cmp (sub (a, i), sub (a, left)) then () else + (swap (a, i, left); fixhead left) + else + if cmp (sub (a, i), sub (a, right)) then () else + (swap (a, i, right); fixhead right) + end + in + update (a, 0, sub (a, newsize)); + DynamicArray.pop a; + fixhead 0 + end + + fun peek (a, cmp) = + if size a = 0 then NONE else SOME (sub (a, 0)) + end + +structure Queue :> QUEUE = + struct + datatype 'a queue = T of {front: 'a list ref, back: 'a list ref} + + fun new() = T{front = ref [], back = ref []} + + fun empty (T {front=ref [], back=ref []}) = true + | empty _ = false + + fun enque(T{back, ...}, x) = back := x :: !back + + fun deque(T{front, back}) = + case !front of + [] => (case !back of + [] => NONE + | l => let val l = rev l + in case l of + [] => raise Fail "deque" + | x :: l => (back := []; front := l; SOME x) + end) + | x :: l => (front := l; SOME x) + end Added: mltonlib/trunk/ca/terpstra/st/edge.fun =================================================================== --- mltonlib/trunk/ca/terpstra/st/edge.fun 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/edge.fun 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,58 @@ +functor Edge(Poll : EPOLL) :> SCHEDULER_EXTRA = + struct + open State + open Thread_Extra + open Timeout_Extra + open Poll + + val poll = create 1000 (* ready for 1000 file descriptors *) + + structure IoEvent : IOEVENT = + struct + open IoEvent + fun monitor fd status = ( + add (poll, fd); + IoEvent.monitor fd status) + fun unmonitor fd = ( + remove (poll, fd); + IoEvent.unmonitor fd) + end + open IoEvent + + fun sigPulse thread = thread before stop () + + fun loop block = + let + fun relativeTime time = + let + val delta = Time.- (time, Time.now ()) + in + if Time.< (delta, Time.zeroTime) + then Time.zeroTime + else delta + end + + val delay = + case block of + PENDING => SOME Time.zeroTime + | COMPLETE => Option.map relativeTime (getNext ()) + in + wait (poll, delay); + trigger (Time.now ()); + loop (run ()) + end + + fun main () = + let + open MLton + open Signal + val real = Itimer.signal Itimer.Real + val freq = Time.fromMilliseconds 50 + in + (* prevent high throughput connections from causing starvation *) + Mask.unblock (Mask.some [real]); + setHandler (real, Handler.handler sigPulse); + (* Itimer.set (Itimer.Real, { interval = freq, value = freq }); *) + loop (run ()) + end + end Added: mltonlib/trunk/ca/terpstra/st/epoll.h =================================================================== --- mltonlib/trunk/ca/terpstra/st/epoll.h 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/epoll.h 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,9 @@ +#include <sys/epoll.h> + +enum EPOLL_CTL { + CTL_ADD = EPOLL_CTL_ADD, + CTL_DEL = EPOLL_CTL_DEL, + CTL_MOD = EPOLL_CTL_MOD +}; + +int close(int); Added: mltonlib/trunk/ca/terpstra/st/epoll.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/epoll.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/epoll.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,15 @@ +signature EPOLL = + sig + type poll + type ioh = IoEvent.ioh + + val create: int -> poll + val close: poll -> unit + + (* Track changes to state of the io handle *) + val add: poll * ioh -> unit + val remove: poll * ioh -> unit + + (* will automatically change IoEvent's status *) + val wait: poll * Time.time option -> unit + end Added: mltonlib/trunk/ca/terpstra/st/epoll.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/epoll.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/epoll.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,71 @@ +(* Edge-triggered *) +structure EPoll :> EPOLL = + struct + type poll = MLRep.Int.Signed.int + type ioh = IoEvent.ioh + + fun create events = F_epoll_create.f (MLRep.Int.Signed.fromInt events) + fun close epoll = ignore (F_close.f epoll) + + fun ctl cmd (epoll, fd) = + let + open E_EPOLL_EVENTS + val makeUnsigned = MLRep.Int.Unsigned.fromInt o MLRep.Int.Signed.toInt + val flags = makeUnsigned (e_EPOLLIN + e_EPOLLOUT + e_EPOLLERR + + e_EPOLLHUP + e_EPOLLET) + val epoll_event = C.new S_epoll_event.typ + in + C.Set.uint (S_epoll_event.f_events epoll_event, flags); + C.Set.sint (U_epoll_data.f_fd (S_epoll_event.f_data epoll_event), + MLRep.Int.Signed.fromInt fd); + F_epoll_ctl.f (epoll, cmd, MLRep.Int.Signed.fromInt fd, + C.Ptr.|&| epoll_event); + C.discard epoll_event + end + + val add = ctl E_EPOLL_CTL.e_CTL_ADD + val remove = ctl E_EPOLL_CTL.e_CTL_DEL + + val nevents = 500 + val events = C.alloc S_epoll_event.typ (Word.fromInt nevents) + + fun wait (epoll, time) = + let + val roundup = Time.fromMicroseconds 999 + val delay = case time of + NONE => ~1 + | SOME x => LargeInt.toInt (Time.toMilliseconds (Time.+ (x, roundup))) + + val nevents = F_epoll_wait.f (epoll, events, nevents, delay) + + fun event ees = + let + open E_EPOLL_EVENTS + val makeUnsigned = MLRep.Int.Unsigned.fromInt o MLRep.Int.Signed.toInt + val EPOLLIN = makeUnsigned e_EPOLLIN + val EPOLLOUT = makeUnsigned e_EPOLLOUT + val EPOLLERR = makeUnsigned e_EPOLLERR + val EPOLLHUP = makeUnsigned e_EPOLLHUP + + val fdf = U_epoll_data.f_fd (S_epoll_event.f_data ees) + val fd = MLRep.Int.Signed.toInt (C.Get.sint fdf) + val flags = C.Get.uint (S_epoll_event.f_events ees) + + fun value bit = MLRep.Int.Unsigned.andb (flags, bit) = bit + val broken = value EPOLLERR orelse value EPOLLHUP + in + IoEvent.notifyHASINPUT fd (value EPOLLIN orelse broken); + IoEvent.notifyCANOUTPUT fd (value EPOLLOUT orelse broken) + end + + fun process i = + if i = nevents then () else + (event (C.Ptr.sub (events, i)); process (i + 1)) + in + process 0 + end + end + +structure Scheduler = Edge(EPoll) +structure IoEvent = Scheduler.IoEvent +structure Scheduler :> SCHEDULER = Scheduler Added: mltonlib/trunk/ca/terpstra/st/ioevent.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/ioevent.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/ioevent.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,20 @@ +signature IOEVENT = + sig + exception Unmonitored + + type status = { hasinput: bool, canoutput: bool} + type ioh + + val socket: ('af, 'sock_type) Socket.sock -> (ioh -> 'a) -> 'a + val sockdes: Socket.sock_desc -> (ioh -> 'a) -> 'a + val file: Posix.IO.file_desc -> (ioh -> 'a) -> 'a + + val HASINPUT: ioh -> (bool, bool) State.state + val CANOUTPUT: ioh -> (bool, bool) State.state + + val notifyHASINPUT: ioh -> bool State.signal + val notifyCANOUTPUT: ioh -> bool State.signal + + val monitor: ioh -> status -> unit + val unmonitor: ioh -> unit + end Added: mltonlib/trunk/ca/terpstra/st/ioevent.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/ioevent.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/ioevent.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,53 @@ +structure IoEvent : IOEVENT = + struct + open State + open SparseArray + + type ioh = int + exception Unmonitored + + type status = { + hasinput: bool, + canoutput: bool } + type filedes = { + fhasinput: (bool, bool) state * bool signal, + fcanoutput: (bool, bool) state * bool signal } + val filedes : filedes sparse_array = new () + + type 'a t = (unit -> 'a) * ('a -> unit) + val (geti, _) = _symbol "side_channel_hack" alloc: int t; + val (_, sets) = _symbol "side_channel_hack": ('a, 'b) Socket.sock t; + val (_, setd) = _symbol "side_channel_hack": Socket.sock_desc t; + val (_, setf) = _symbol "side_channel_hack": Posix.IO.file_desc t; + + fun socket sock f = f (sets sock; geti ()) + fun sockdes des f = f (setd des; geti ()) + fun file file f = f (setf file; geti ()) + + fun test select fd = case sub (filedes, fd) of + NONE => raise Unmonitored + | SOME x => case select x of (state, _) => state + + val HASINPUT = test #fhasinput + val CANOUTPUT = test #fcanoutput + + fun notify select fd = case sub (filedes, fd) of + NONE => raise Unmonitored + | SOME x => case select x of (_, signal) => signal + + val notifyHASINPUT = notify #fhasinput + val notifyCANOUTPUT = notify #fcanoutput + + fun monitor fd (status:status) = + let + val entry = { + fhasinput = state (#hasinput status), + fcanoutput = state (#canoutput status) } + in + update (filedes, fd, entry) + end + + fun unmonitor fd = case sub (filedes, fd) of + NONE => raise Unmonitored + | SOME _ => erase (filedes, fd) + end Added: mltonlib/trunk/ca/terpstra/st/kevent.h =================================================================== --- mltonlib/trunk/ca/terpstra/st/kevent.h 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/kevent.h 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,57 @@ +#include <sys/event.h> +#include <sys/time.h> + +enum filter { + read = EVFILT_READ, + write = EVFILT_WRITE, + aio = EVFILT_AIO, + vnode = EVFILT_VNODE, + proc = EVFILT_PROC, + signal = EVFILT_SIGNAL, + timer = EVFILT_TIMER, + machport = EVFILT_MACHPORT, + fs = EVFILT_FS +}; + +enum action { + add = EV_ADD, + delete = EV_DELETE, + enable = EV_ENABLE, + disable = EV_DISABLE, + oneshot = EV_ONESHOT, + clear = EV_CLEAR, + sysflags = EV_SYSFLAGS, + flag0 = EV_FLAG0, + flag1 = EV_FLAG1, + eof = EV_EOF, + error = EV_ERROR, + poll = EV_POLL, + ooband = EV_OOBAND +}; + +/* +enum note { + lowat = NOTE_LOWAT, + delete = NOTE_DELETE, + write = NOTE_WRITE, + extend = NOTE_EXTEND, + attrib = NOTE_ATTRIB, + link = NOTE_LINK, + rename = NOTE_RENAME, + revoke = NOTE_REVOKE, + exit = NOTE_EXIT, + fork = NOTE_FORK, + exec = NOTE_EXEC, + pctrlmask = NOTE_PCTRLMASK, + pdatamask = NOTE_PDATAMASK, + seconds = NOTE_SECONDS, + useconds = NOTE_USECONDS, + nseconds = NOTE_NSECONDS, + absolute = NOTE_ABSOLUTE, + track = NOTE_TRACK, + trackerr = NOTE_TRACKERR, + child = NOTE_CHILD +}; +*/ + +int close(int fd); Added: mltonlib/trunk/ca/terpstra/st/kqueue.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/kqueue.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/kqueue.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,102 @@ +structure KQueue :> EPOLL = + struct + type poll = MLRep.Int.Signed.int + type ioh = IoEvent.ioh + + fun create _ = F_kqueue.f () + fun close epoll = ignore (F_close.f epoll) + +(* + val () = print ("change: " ^ Int.toString fd ^ ": ") + val () = print (Int.toString filter ^ " ") + val () = print (Int.toString flags) + val () = print "\n" +*) + fun kevent (ke, fd, filter, flags) = + (C.Set.ulong (S_kevent.f_ident ke, + MLRep.Long.Unsigned.fromInt fd); + C.Set.sshort (S_kevent.f_filter ke, + MLRep.Short.Signed.fromInt + (MLRep.Int.Signed.toInt filter)); + C.Set.ushort (S_kevent.f_flags ke, + MLRep.Short.Unsigned.fromInt + (MLRep.Int.Signed.toInt flags))) + + fun control flags (epoll, fd) = + let + val changes = C.alloc S_kevent.typ (Word.fromInt 2) + val zero = C.new S_timespec.typ + in + kevent (C.Ptr.sub (changes, 0), fd, E_filter.e_read, flags); + kevent (C.Ptr.sub (changes, 1), fd, E_filter.e_write, flags); + C.Set.slong (S_timespec.f_tv_sec zero, 0); + C.Set.slong (S_timespec.f_tv_nsec zero, 0); + F_kevent.f (epoll, + C.Ptr.ro changes, 2, + C.Ptr.null (C.T.pointer S_kevent.typ), 0, + C.Ptr.ro (C.Ptr.|&| zero)); + C.discard zero; + C.free changes + end + + val add = control (E_action.e_add + E_action.e_clear) + val remove = control E_action.e_delete + + val nevents = 500 + val events = C.alloc S_kevent.typ (Word.fromInt nevents) + + fun event ke = + let + val fd = C.Get.ulong (S_kevent.f_ident ke) + val io = C.Get.sshort (S_kevent.f_filter ke) + + val fd = MLRep.Long.Unsigned.toInt fd + + val cvt = MLRep.Short.Signed.fromInt o MLRep.Int.Signed.toInt + val read = cvt E_filter.e_read + val write = cvt E_filter.e_write +(* + val () = print ("event: " ^ Int.toString fd ^ ":") + val () = if io = read then print " read" else () + val () = if io = write then print " write" else () + val () = print "\n" +*) + in + if io = read then IoEvent.notifyHASINPUT fd true else (); + if io = write then IoEvent.notifyCANOUTPUT fd true else () + end + + fun wait (epoll, time) = + let + fun timespec NONE = C.Ptr.null (C.T.pointer S_timespec.typ) + | timespec (SOME t) = + let + val ts = C.alloc S_timespec.typ (Word.fromInt 1) + val (seconds, nano) = + IntInf.quotRem (Time.toNanoseconds t, 1000000000) + in + C.Set.slong (S_timespec.f_tv_sec (C.Ptr.|*| ts), + MLRep.Long.Signed.fromLarge seconds); + C.Set.slong (S_timespec.f_tv_nsec (C.Ptr.|*| ts), + MLRep.Long.Signed.fromLarge nano); + ts + end + val ts = timespec time + + val changes = C.Ptr.ro (C.Ptr.null (C.T.pointer S_kevent.typ)) + val nevents = F_kevent.f (MLRep.Int.Signed.fromInt epoll, + changes, 0, + events, nevents, + C.Ptr.ro ts) + fun process i = + if i = nevents then () else + (event (C.Ptr.sub (events, i)); process (i + 1)) + in + process 0; + if C.Ptr.isNull ts then () else C.free ts + end + end + +structure Scheduler = Edge(KQueue) +structure IoEvent :> IOEVENT = Scheduler.IoEvent +structure Scheduler :> SCHEDULER = Scheduler Added: mltonlib/trunk/ca/terpstra/st/level.fun =================================================================== --- mltonlib/trunk/ca/terpstra/st/level.fun 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/level.fun 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,73 @@ +functor Level(Poll : LPOLL) :> SCHEDULER_EXTRA = + struct + open State + open Thread_Extra + open Timeout_Extra + open Poll + + val poll = create 1000 (* ready for 1000 file descriptors *) + + structure IoEvent : IOEVENT = + struct + open IoEvent + + fun monitor fd { hasinput, canoutput } = ( + if hasinput then () else watch (poll, fd, Poll.HASINPUT); + if canoutput then () else watch (poll, fd, Poll.CANOUTPUT); + IoEvent.monitor fd {hasinput = hasinput, canoutput = canoutput}) + + fun unmonitor fd = ( + unwatchall (poll, fd); + IoEvent.unmonitor fd) + + fun notifyHASINPUT fd true = ( + IoEvent.notifyHASINPUT fd true) + | notifyHASINPUT fd false = ( + Poll.watch (poll, fd, Poll.HASINPUT); + IoEvent.notifyHASINPUT fd false) + + fun notifyCANOUTPUT fd true = ( + IoEvent.notifyCANOUTPUT fd true) + | notifyCANOUTPUT fd false = ( + Poll.watch (poll, fd, Poll.CANOUTPUT); + IoEvent.notifyCANOUTPUT fd false) + end + open IoEvent + + fun sigPulse thread = thread before stop () + + fun loop block = + let + fun relativeTime time = + let + val delta = Time.- (time, Time.now ()) + in + if Time.< (delta, Time.zeroTime) + then Time.zeroTime + else delta + end + + val delay = + case block of + PENDING => SOME Time.zeroTime + | COMPLETE => Option.map relativeTime (getNext ()) + in + wait (poll, delay); + trigger (Time.now ()); + loop (run ()) + end + + fun main () = + let + open MLton + open Signal + val real = Itimer.signal Itimer.Real + val freq = Time.fromMilliseconds 50 + in + (* prevent high throughput connections from causing starvation *) + Mask.unblock (Mask.some [real]); + setHandler (real, Handler.handler sigPulse); + (* Itimer.set (Itimer.Real, { interval = freq, value = freq }); *) + loop (run ()) + end + end Added: mltonlib/trunk/ca/terpstra/st/lpoll.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/lpoll.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/lpoll.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,21 @@ +(* Signature for level-triggered poll *) +signature LPOLL = + sig + type poll + type ioh = IoEvent.ioh + datatype level = HASINPUT | CANOUTPUT + + val create: int -> poll + val close: poll -> unit + + (* add a watch to the list *) + val watch: poll * ioh * level -> unit + + (* called prior to closing the io handle *) + val unwatchall: poll * ioh -> unit + + (* automatically change IoEvent's status + * triggered watches are automatically removed from the poll (ie: oneshot) + *) + val wait: poll * Time.time option -> unit + end Added: mltonlib/trunk/ca/terpstra/st/open.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/open.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/open.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,5 @@ +open State +open Thread +open Timeout +open IoEvent +open Scheduler Added: mltonlib/trunk/ca/terpstra/st/scheduler.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/scheduler.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/scheduler.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,10 @@ +signature SCHEDULER = + sig + val main: unit -> unit + end + +signature SCHEDULER_EXTRA = + sig + include SCHEDULER + structure IoEvent: IOEVENT + end Added: mltonlib/trunk/ca/terpstra/st/socket.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/socket.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/socket.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,202 @@ +structure Socket : SOCKET = + struct + open Socket + open State + open IoEvent + open Timeout + open Thread + + fun wrapInNB f s x = + case f x of + NONE => NONE before socket (s x) notifyHASINPUT false + | SOME v => SOME v + + val recvVecNB = fn x => wrapInNB recvVecNB #1 x + val recvVecNB' = fn x => wrapInNB recvVecNB' #1 x + val recvArrNB = fn x => wrapInNB recvArrNB #1 x + val recvArrNB' = fn x => wrapInNB recvArrNB' #1 x + + val recvVecFromNB = fn x => wrapInNB recvVecFromNB #1 x + val recvVecFromNB' = fn x => wrapInNB recvVecFromNB' #1 x + val recvArrFromNB = fn x => wrapInNB recvArrFromNB #1 x + val recvArrFromNB' = fn x => wrapInNB recvArrFromNB' #1 x + + fun wrapIn f s x = ( + stopTill (socket (s x) HASINPUT); + case f x of + NONE => wrapIn f s x + | SOME x => x) + + fun recvVec x = wrapIn recvVecNB #1 x + fun recvVec' x = wrapIn recvVecNB' #1 x + fun recvArr x = wrapIn recvArrNB #1 x + fun recvArr' x = wrapIn recvArrNB' #1 x + + fun recvVecFrom x = wrapIn recvVecFromNB #1 x + fun recvVecFrom' x = wrapIn recvVecFromNB' #1 x + fun recvArrFrom x = wrapIn recvArrFromNB #1 x + fun recvArrFrom' x = wrapIn recvArrFromNB' #1 x + + fun wrapOutNB f s x = + case f x of + NONE => NONE before socket (s x) notifyCANOUTPUT false + | SOME v => SOME v + + val sendVecNB = fn x => wrapOutNB sendVecNB #1 x + val sendVecNB' = fn x => wrapOutNB sendVecNB' #1 x + val sendArrNB = fn x => wrapOutNB sendArrNB #1 x + val sendArrNB' = fn x => wrapOutNB sendArrNB' #1 x + + fun wrapOutNBbool f s x = + case f x of + false => false before socket (s x) notifyCANOUTPUT false + | true => true + + val sendVecToNB = fn x => wrapOutNBbool sendVecToNB #1 x + val sendVecToNB' = fn x => wrapOutNBbool sendVecToNB' #1 x + val sendArrToNB = fn x => wrapOutNBbool sendArrToNB #1 x + val sendArrToNB' = fn x => wrapOutNBbool sendArrToNB' #1 x + + fun wrapOut f s x = ( + stopTill (socket (s x) CANOUTPUT); + case f x of + NONE => wrapOut f s x + | SOME x => x) + + fun sendVec x = wrapOut sendVecNB #1 x + fun sendVec' x = wrapOut sendVecNB' #1 x + fun sendArr x = wrapOut sendArrNB #1 x + fun sendArr' x = wrapOut sendArrNB' #1 x + + fun wrapOutbool f s x = ( + stopTill (socket (s x) CANOUTPUT); + case f x of + false => wrapOutbool f s x + | true => ()) + + fun sendVecTo x = wrapOutbool sendVecToNB #1 x + fun sendVecTo' x = wrapOutbool sendVecToNB' #1 x + fun sendArrTo x = wrapOutbool sendArrToNB #1 x + fun sendArrTo' x = wrapOutbool sendArrToNB' #1 x + + val acceptNB = fn s => + case acceptNB s of + NONE => NONE before socket s notifyHASINPUT false + | SOME (s, a) => + (* It is safe to say no input, b/c edge triggered APIs always + * give at least one initial status report. It is also safe + * for level triggered, since this gets it added to the poll. + * Thus, no really fast sends are lost. + * + * This is the smart thing to do, because SYN+ACK takes a while + * to reach the client. So, there's no point wasting a recv() + * when it's almost surely not going to have data yet anyways. + *) + SOME (s, a) before socket s monitor { hasinput = false, + canoutput = true } + fun accept x = wrapIn acceptNB (fn s => s) x + + val close = fn s => (socket s unmonitor; close s) + + val listen = fn (s, i) => + (* due to a bug in BSD's kqueue API, we must re-monitor *) + (socket s unmonitor; + listen (s, i); + socket s monitor { hasinput = false, canoutput = true }) + + val connect = fn (s, a) => + case connectNB (s, a) of + true => () + | false => ( + stopTill (socket s CANOUTPUT); + (* Get the error status, if getERROR doesn't raise, we raise + * something generic since we only know that it failed. + *) + if Socket.Ctl.getERROR s + then raise OS.SysErr ("Connection failed", NONE) + else ()) + + fun select {rds, wrs, exs, timeout} = + let + datatype which = + RDS of sock_desc | WRS of sock_desc | TIMER + + val rds = List.map (fn rd => (sockdes rd HASINPUT, RDS rd)) rds + val wrs = List.map (fn wr => (sockdes wr CANOUTPUT, WRS wr)) wrs + val tmr = case timeout of SOME x => [(TIMEOUT x, TIMER)] | NONE => [] + val events = List.concat [rds, wrs, tmr] + + val ords = ref [] + val owrs = ref [] + + fun split (RDS rd) = ords := rd :: !ords + | split (WRS wr) = owrs := wr :: !owrs + | split TIME = () + in + List.app split (Thread.select events); + {rds = !ords, wrs = !owrs, exs = []} + end + end + +structure Wrap = + struct + local + open IoEvent + in + val monitor = fn s => + s before socket s monitor { hasinput = false, canoutput = false } + + fun monitorPair (s, t) = (monitor s, monitor t) + end + end + +structure GenericSock : GENERIC_SOCK = + struct + open GenericSock + open Wrap + + val socket = fn x => monitor (socket x) + val socket' = fn x => monitor (socket' x) + val socketPair = fn x => monitorPair (socketPair x) + val socketPair' = fn x => monitorPair (socketPair' x) + end + +structure INetSock : INET_SOCK = + struct + open INetSock + open Wrap + + structure UDP = + struct + open UDP + val socket = fn x => monitor (socket x) + val socket' = fn x => monitor (socket' x) + end + + structure TCP = + struct + open TCP + val socket = fn x => monitor (socket x) + val socket' = fn x => monitor (socket' x) + end + end + +structure UnixSock : UNIX_SOCK = + struct + open UnixSock + open Wrap + + structure Strm = + struct + open Strm + val socket = fn x => monitor (socket x) + val socketPair = fn x => monitorPair (socketPair x) + end + + structure DGrm = + struct + open DGrm + val socket = fn x => monitor (socket x) + val socketPair = fn x => monitorPair (socketPair x) + end + end Added: mltonlib/trunk/ca/terpstra/st/st.mlb =================================================================== --- mltonlib/trunk/ca/terpstra/st/st.mlb 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/st.mlb 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,56 @@ +local + $(SML_LIB)/basis/basis.mlb + $(SML_LIB)/basis/mlton.mlb + $(SML_LIB)/mlnlffi-lib/mlnlffi-lib.mlb + + ann + "allowFFI true" + in + data.sig + data.sml + + state.sig + state.sml + thread.sig + thread.sml + + timeout.sig + timeout.sml + ioevent.sig + ioevent.sml + + scheduler.sig + epoll.sig + edge.fun + lpoll.sig + level.fun + + kevent/kevent.mlb + kqueue.sml + +(* epoll/epoll.mlb + epoll.sml +*) + socket.sml + end +in + signature STATE + signature THREAD + signature TIMEOUT + signature IOEVENT + signature SCHEDULER + + structure State + structure Thread + structure Timeout + structure IoEvent + structure Scheduler + + (* override basis definitions with ours -- we have hooks *) + structure Socket + structure GenericSock + structure INetSock + structure UnixSock + + open.sml +end Added: mltonlib/trunk/ca/terpstra/st/state.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/state.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/state.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,47 @@ +(* Attempts to classify states as 'level-triggered' or 'edge-triggered' + * will fail, as these terms make sense only at the intersection of states + * and blocking primitives. Both styles (and others) can be realized using + * the watch and value methods. + * + * A given state may only be watched once (1 time in 1 thread). + * If a second watch is attempted, the RaceCondition exception is raised. + *) +signature STATE = + sig + type ('val, 'diff) state + type 'diff signal = 'diff -> unit + + (* create a new state *) + val state: ''val -> (''val, ''val) state * ''val signal + val delta: ('val * 'diff -> 'val option) -> 'val -> + ('val, 'diff) state * 'diff signal + + (* get the current value of a state *) + val value: ('val, 'diff) state -> 'val + + (* hook a callback invoked when the state changes *) + exception RaceCondition + exception UnWatched + val dwatch: ('val * 'diff -> unit) -> ('val, 'diff) state -> unit + val swatch: ('val -> unit) -> ('val, 'diff) state -> unit + val release: ('val, 'diff) state -> unit + + (* map this state into a new derived state *) + val smap: ('val1 -> 'val2) -> + ('val1, 'val1) state -> ('val2, 'val2) state + val dmap: ('val1 -> 'val2) * + ('val1 * 'diff1 * 'val2 -> ('val2 * 'diff2) option) -> + ('val1, 'diff1) state -> ('val2, 'diff2) state + + (* compose two states into their product *) + datatype ('diff1, 'diff2) alt = DIFF1 of 'diff1 | DIFF2 of 'diff2 + val scompose: ('val1, 'val1) state * ('val2, 'val2) state -> + ('val1 * 'val2, 'val1 * 'val2) state + val dcompose: ('val1, 'diff1) state * ('val2, 'diff2) state -> + ('val1 * 'val2, ('diff1, 'diff2) alt) state + + (* If you want multiple watchers on the same state *) + type ('val, 'diff) broadcast + val broadcast: ('val, 'diff) state -> ('val, 'diff) broadcast + val clone: ('val, 'diff) broadcast -> ('val, 'diff) state + end Added: mltonlib/trunk/ca/terpstra/st/state.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/state.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/state.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,133 @@ +structure State :> STATE = + struct + type ('val, 'diff) state = { + value: unit -> 'val, + release: unit -> unit, + watch: ('val * 'diff -> unit) -> unit + } + type 'diff signal = 'diff -> unit + exception RaceCondition + exception UnWatched + + fun delta update init = + let + val state = ref init + val block = ref NONE + + fun value () = !state + fun release () = + case !block of + NONE => raise UnWatched + | SOME _ => block := NONE + fun watch f = + case !block of + NONE => block := SOME f + | SOME _ => raise RaceCondition + fun signal diff = + case update (!state, diff) of + NONE => () + | SOME newval => + case !block of + NONE => state := newval + | SOME f => (state := newval; f (newval, diff)) + in + ({ value = value, release = release, watch = watch }, signal) + end + fun state init = delta (fn (s, d) => if s = d then NONE else SOME d) init + + fun value { value, release=_, watch=_ } = value () + fun release { value=_, release, watch=_ } = release () + fun dwatch f { value=_, release=_, watch } = watch f + fun swatch f = dwatch (fn (x, _) => f x) + + fun dmap (valmap, diffmap) state = + let + val valproxy = ref NONE + val block = ref NONE + + fun proxy (val1, diff1) = + case diffmap (val1, diff1, valOf (!valproxy)) of + NONE => () + | SOME (newval2, diff2) => + case !block of + NONE => valproxy := SOME newval2 + | SOME f => (valproxy := SOME newval2; f (newval2, diff2)) + + val watch = fn f => + case !block of + NONE => (dwatch proxy state; (* first b/c it might raise *) + block := SOME f; + valproxy := SOME (valmap (value state))) + | SOME _ => raise RaceCondition + val value = fn () => + case !valproxy of + NONE => valmap (value state) + | SOME x => x + val release = fn () => + case !block of + NONE => raise UnWatched + | SOME _ => (release state; block := NONE; valproxy := NONE) + in + { value = value, release = release, watch = watch } + end + fun smap valmap = + dmap (valmap, fn (v, _, _) => let val v2 = valmap v in SOME (v2, v2) end) + + datatype ('diff1, 'diff2) alt = DIFF1 of 'diff1 | DIFF2 of 'diff2 + fun dcompose (state1, state2) = + let + val block = ref NONE + fun proxy1 (val1, diff1) = + (valOf (!block)) ((val1, value state2), DIFF1 diff1) + fun proxy2 (val2, diff2) = + (valOf (!block)) ((value state1, val2), DIFF2 diff2) + + val watch = fn f => + case !block of + NONE => ( + dwatch proxy1 state1; + (dwatch proxy2 state2 handle ex => (release state1; raise ex)); + block := SOME f) + | SOME _ => raise RaceCondition + val value = fn () => + (value state1, value state2) + val release = fn () => + case !block of + NONE => raise UnWatched + | SOME _ => (release state1; release state2; block := NONE) + in + { value = value, release = release, watch = watch } + end + + fun scompose (state1, state2) = + let + val block = ref NONE + fun proxy1 (val1, diff1) = + let val val2 = value state2 in + (valOf (!block)) ((val1, val2), (val1, val2)) end + fun proxy2 (val2, diff2) = + let val val1 = value state1 in + (valOf (!block)) ((val1, val2), (val1, val2)) end + + val watch = fn f => + case !block of + NONE => ( + dwatch proxy1 state1; + (dwatch proxy2 state2 handle ex => (release state1; raise ex)); + block := SOME f) + | SOME _ => raise RaceCondition + val value = fn () => + (value state1, value state2) + val release = fn () => + case !block of + NONE => raise UnWatched + | SOME _ => (release state1; release state2; block := NONE) + in + { value = value, release = release, watch = watch } + end + + (* !!! fixme *) + type ('val, 'diff) broadcast = ('val, 'diff) state + fun broadcast state = state + fun clone broadcaster = broadcaster + end Added: mltonlib/trunk/ca/terpstra/st/test.mlb =================================================================== --- mltonlib/trunk/ca/terpstra/st/test.mlb 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/test.mlb 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,6 @@ +local + $(SML_LIB)/basis/basis.mlb + st.mlb +in + test.sml +end Added: mltonlib/trunk/ca/terpstra/st/test.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/test.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/test.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,59 @@ +type port = (INetSock.inet, Socket.passive Socket.stream) Socket.sock + +(* There must be a better (faster!) way to convert a string to unsigned char *) +fun msg s = Word8VectorSlice.full (Word8Vector.tabulate + (String.size s, Word8.fromInt o Char.ord o (fn i => String.sub (s, i)))) +fun str v = CharVector.tabulate (Word8Vector.length v, + Char.chr o Word8.toInt o (fn i => Word8Vector.sub (v, i))) + +val delay = Time.fromSeconds 5 +val port : port = INetSock.TCP.socket () +val () = Socket.Ctl.setREUSEADDR (port, true) +val () = Socket.bind (port, INetSock.any 12467) +val () = Socket.listen (port, 100) + +val google = valOf (NetHostDB.getByName "www.google.de") +val ghttp = INetSock.toAddr (NetHostDB.addr google, 80) + +fun http () = + let + val s = INetSock.TCP.socket () + val () = print "connecting...\n" + val () = Socket.connect (s, ghttp) + val () = print "sending...\n" + val _ = Socket.sendVec (s, msg "GET / HTTP/1.1\nHost: www.google.de\n\n") + val () = print "reading...\n" + val got = Socket.recvVec (s, 4096) + val () = print "done!\n" + in + print ("response: " ^ str got ^ "\n") + end + +fun worker s () = + let + val _ = Socket.sendVec (s, msg "hello and welcome!\n"); + val got = Word8VectorSlice.full (Socket.recvVec (s, 400)) + in + if Word8VectorSlice.length got = 0 then Socket.close s else + (Socket.sendVec (s, got); worker s ()) + end + +fun welcome () = + let + val (s, _) = Socket.accept port + in + spawn (worker s); + welcome () + end + +fun beat () = ( + stopTill (TIMEOUT delay); + print "hello world\n"; + beat ()) + +val () = spawn welcome +val () = spawn beat +val () = spawn http +val () = spawn http + +val () = main () Added: mltonlib/trunk/ca/terpstra/st/thread.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/thread.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/thread.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,18 @@ +signature THREAD = + sig + (* start a new thread, which will be run later *) + val spawn: (unit -> unit) -> unit + val yield: 'a -> 'a (* release control for a tick *) + + val stopTill: (bool, 'a) State.state -> unit + val select: ((bool, 'b) State.state * 'a) list -> 'a list + end + +signature THREAD_EXTRA = + sig + include THREAD + + datatype loop = COMPLETE | PENDING + val run: unit -> loop (* process queue till completed or stopped *) + val stop: unit -> unit (* stop processing queue and return soon *) + end Added: mltonlib/trunk/ca/terpstra/st/thread.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/thread.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/thread.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,65 @@ +structure Thread_Extra :> THREAD_EXTRA = + struct + open MLton.Thread + open State + type thread = Runnable.t + + val ready : thread Queue.queue = Queue.new () + val loop : thread option ref = ref NONE + val quit : bool ref = ref false + + fun next () = + if Queue.empty ready orelse !quit then valOf (!loop) else + valOf (Queue.deque ready) + + fun spawn main = + Queue.enque (ready, prepare (new + (fn () => (main (); switch (fn _ => next ()))), ())) + + fun yield result = switch (fn thread => ( + Queue.enque (ready, prepare (thread, result)); + next ())) + + datatype loop = COMPLETE | PENDING + fun run () = ( + quit := false; + switch (fn thread => (loop := SOME (prepare (thread, ())); next ())); + case Queue.empty ready of + true => COMPLETE | false => PENDING) + + fun stop () = quit := true + + (* the while loop deals with the case that a state may have only + * temporarily become true (before switch), but is not true any longer. + *) + fun stopTill state = + while not (value state) do switch (fn thread => + let + fun resume _ = ( + release state; + Queue.enque (ready, prepare (thread, ()))) + in + swatch resume state; + next () + end) + + fun select events = + let + fun map (state, res) = if value state then SOME res else NONE + fun block thread = + let + fun resume _ = ( + List.app (fn (state, _) => release state) events; + Queue.enque (ready, prepare (thread, ()))) + in + List.app (fn (state, _) => swatch resume state) events; + next () + end + in + case List.mapPartial map events of + x :: r => x :: r + | [] => (switch block; select events) + end + end + +structure Thread :> THREAD = Thread_Extra Added: mltonlib/trunk/ca/terpstra/st/timeout.sig =================================================================== --- mltonlib/trunk/ca/terpstra/st/timeout.sig 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/timeout.sig 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,21 @@ +signature TIMEOUT = + sig + (* TIMEOUT is measured since the last IO poll, not the instant called *) + val TIMEOUT: Time.time -> (bool, bool) State.state + + (* LATERTHAN is an absolute time value *) + val LATERTHAN: Time.time -> (bool, bool) State.state + + (* What is the cached time as of last tick (fast) *) + val lastTick: unit -> Time.time + end + +signature TIMEOUT_EXTRA = + sig + include TIMEOUT + + (* The earliest pending timer (if any) *) + val getNext: unit -> Time.time option + (* Toggle all states to true prior to the given *) + val trigger: Time.time -> unit + end Added: mltonlib/trunk/ca/terpstra/st/timeout.sml =================================================================== --- mltonlib/trunk/ca/terpstra/st/timeout.sml 2006-12-15 14:53:49 UTC (rev 4981) +++ mltonlib/trunk/ca/terpstra/st/timeout.sml 2006-12-19 02:52:52 UTC (rev 4982) @@ -0,0 +1,46 @@ +(* !!! fixme: timers persist in the heap even if unreferenced. + * once MLton bug is fixed, use MLton.Weak and MLton.Finalizable + *) +structure Timeout_Extra :> TIMEOUT_EXTRA = + struct + open State + open Time + open Heap + + type sleeper = time * bool signal + fun nextSleeper ((t1, _), (t2, _)) = t1 < t2 + val sleeper = new nextSleeper + val rLastTick = ref (Time.now ()) + + fun lastTick () = !rLastTick + + fun LATERTHAN time = + let + val (state, signal) = state false + in + push (sleeper, (time, signal)); + state + end + + fun TIMEOUT time = LATERTHAN (time + lastTick ()) + + fun getNext () = + case peek sleeper of + NONE => NONE + | SOME (t, _) => SOME t + + fun trigger time = + let + fun loop () = + case peek sleeper of + NONE => () + | SOME (t, s) => + if time < t then () else + (pop sleeper; s true; loop ()) + in + rLastTick := time; + loop () + end + end + +structure Timeout :> TIMEOUT = Timeout_Extra |