Update of /cvsroot/sbcl/sbcl/contrib/sb-concurrency
In directory sfp-cvsdas-3.v30.ch3.sourceforge.com:/tmp/cvs-serv3649/contrib/sb-concurrency
Added Files:
Makefile mailbox.lisp package.lisp queue.lisp
sb-concurrency.asd sb-concurrency.texinfo
Log Message:
1.0.37.18: New contrib SB-CONCURRENCY.
sb-concurrency related changes:
* create contrib/sb-concurrency/
* add the implementation of lock-free Queues from sb-queue.
* add a new implementation of Mailboxes on top of the Queues.
sb-queue related changes:
* merged into sb-concurrency
* deprecated now, but the system / package is still retained for
backwards compatibility; the package simply reexports from
sb-concurrency.
doc changes:
* add section for sb-concurrency to manual
--- NEW FILE: Makefile ---
SYSTEM=sb-concurrency
include ../asdf-module.mk
--- NEW FILE: mailbox.lisp ---
;;;; Lock-free mailbox implementation using SB-QUEUE.
;;;;
;;;; Written by Nikodemus Siivola for SBCL.
;;;; Extended by Tobias C Rittweiler.
;;;;
;;;; This software is part of the SBCL system. See the README file for
;;;; more information.
;;;;
;;;; This software is derived from the CMU CL system, which was written at
;;;; Carnegie Mellon University and released into the public domain. The
;;;; software is in the public domain and is provided with absolutely no
;;;; warranty. See the COPYING and CREDITS files for more information.
(in-package :sb-concurrency)
;; TODO: type and values decls
(defstruct (mailbox (:constructor %make-mailbox (queue semaphore name))
(:copier nil)
(:predicate mailboxp))
"Mailbox aka message queue."
(queue (missing-arg) :type queue)
(semaphore (missing-arg) :type semaphore)
(name nil))
(setf (documentation 'mailboxp 'function)
"Returns true if argument is a MAILBOX, NIL otherwise."
(documentation 'mailbox-name 'function)
"Name of a MAILBOX. SETFable.")
(defun make-mailbox (&key name initial-contents)
"Returns a new MAILBOX with messages in INITIAL-CONTENTS enqueued."
(flet ((genname (thing name)
(format nil "~:[Mailbox ~A~;~A for mailbox ~S~]" name thing name)))
(%make-mailbox (make-queue
:name (genname "Queue" name)
:initial-contents initial-contents)
(make-semaphore
:name (genname "Semaphore" name)
:count (length initial-contents))
name)))
(defmethod print-object ((mailbox mailbox) stream)
(print-unreadable-object (mailbox stream :type t :identity t)
(format stream "~@[~S ~](~D msgs pending)"
(mailbox-name mailbox)
(mailbox-count mailbox)))
mailbox)
(defun mailbox-count (mailbox)
"Returns the number of messages currently in the mailbox."
(semaphore-count (mailbox-semaphore mailbox)))
(defun mailbox-empty-p (mailbox)
"Returns true if MAILBOX is currently empty, NIL otherwise."
(zerop (mailbox-count mailbox)))
(defun list-mailbox-messages (mailbox)
"Returns a fresh list containing all the messages in the
mailbox. Does not remove messages from the mailbox."
(list-queue-contents (mailbox-queue mailbox)))
(defun send-message (mailbox message)
"Adds a MESSAGE to MAILBOX. Message can be any object."
(sb-sys:without-interrupts
(enqueue message (mailbox-queue mailbox))
(signal-semaphore (mailbox-semaphore mailbox))))
;;; TODO: TIMEOUT argument.
(defun receive-message (mailbox &key)
"Removes the oldest message from MAILBOX and returns it as the
primary value. If MAILBOX is empty waits until a message arrives."
(tagbody
;; Disable interrupts for keeping semaphore count in sync with
;; #msgs in the mailbox.
(sb-sys:without-interrupts
(sb-sys:allow-with-interrupts
(wait-on-semaphore (mailbox-semaphore mailbox)))
(multiple-value-bind (value ok) (dequeue (mailbox-queue mailbox))
(if ok
(return-from receive-message value)
(go :error))))
:error
(sb-int:bug "Mailbox ~S empty after WAIT-ON-SEMAPHORE."
mailbox)))
(defun receive-message-no-hang (mailbox)
"The non-blocking variant of RECEIVE-MESSAGE. Returns two values,
the message removed from MAILBOX, and a flag specifying whether a
message could be received."
(prog ((semaphore (mailbox-semaphore mailbox))
(queue (mailbox-queue mailbox)))
;; Disable interrupts, v.s.
(sb-sys:without-interrupts
(unless (sb-sys:allow-with-interrupts
(sb-thread::try-semaphore semaphore))
(return (values nil nil)))
(multiple-value-bind (value ok) (dequeue queue)
(if ok
(return (values value t))
(go :error))))
:error
(sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
mailbox)))
(defun receive-pending-messages (mailbox &optional n)
"Removes and returns all (or at most N) currently pending messages
from MAILBOX, or returns NIL if no messages are pending.
Note: Concurrent threads may be snarfing messages during the run of
this function, so even though X,Y appear right next to each other in
the result, does not necessarily mean that Y was the message sent
right after X."
(prog* ((msgs '())
(sem (mailbox-semaphore mailbox))
(queue (mailbox-queue mailbox))
(avail (mailbox-count mailbox))
(count (if n (min n avail) avail)))
(when (zerop count)
(go :finish))
;; Disable interrupts, v.s.
(sb-sys:without-interrupts
(unless (sb-sys:allow-with-interrupts
(sb-thread::try-semaphore sem count))
(go :slow-path))
;; Safe because QUEUE is private; other threads may be snarfing
;; messages under our feet, though, hence the out of order bit
;; in the docstring. Same for the slow path.
(loop
(multiple-value-bind (msg ok) (dequeue queue)
(unless ok (go :error))
(push msg msgs)
(when (zerop (decf count))
(go :finish)))))
;; This is the slow path as RECEIVE-MESSAGE-NO-HANG will have to
;; lock the semaphore's mutex again and again.
:slow-path
;; No need for disabling interrupts because we never leave the
;; mailbox in an inconsistent state here.
(loop
(multiple-value-bind (msg ok)
(receive-message-no-hang mailbox)
(unless ok (go :finish))
(push msg msgs)
(when (zerop (decf count))
(go :finish))))
:finish
(return (nreverse msgs))
:error
(sb-int:bug "Mailbox ~S empty after successfull TRY-SEMAPHORE."
mailbox)))
--- NEW FILE: package.lisp ---
(defpackage :sb-concurrency
(:use :cl :sb-thread)
(:export
;; MAILBOX
"LIST-MAILBOX-MESSAGES"
"MAILBOX"
"MAILBOX-COUNT"
"MAILBOX-EMPTY-P"
"MAILBOX-NAME"
"MAILBOXP"
"MAKE-MAILBOX"
"RECEIVE-MESSAGE"
"RECEIVE-MESSAGE-NO-HANG"
"RECEIVE-PENDING-MESSAGES"
"SEND-MESSAGE"
;; QUEUE
"DEQUEUE"
"ENQUEUE"
"LIST-QUEUE-CONTENTS"
"MAKE-QUEUE"
"QUEUE"
"QUEUE-COUNT"
"QUEUE-EMPTY-P"
"QUEUE-NAME"
"QUEUEP"
))
--- NEW FILE: queue.lisp ---
;;;; Lock-free FIFO queues, from "An Optimistic Approach to Lock-Free FIFO
;;;; Queues" by Edya Ladan-Mozes and Nir Shavit.
;;;;
;;;; Written by Nikodemus Siivola for SBCL.
;;;;
;;;; This software is part of the SBCL system. See the README file for
;;;; more information.
;;;;
;;;; This software is derived from the CMU CL system, which was written at
;;;; Carnegie Mellon University and released into the public domain. The
;;;; software is in the public domain and is provided with absolutely no
;;;; warranty. See the COPYING and CREDITS files for more information.
(in-package :sb-concurrency)
(defconstant +dummy+ '.dummy.)
(declaim (inline make-node))
(defstruct node
value
(prev nil :type (or null node))
(next nil :type (or null node)))
(declaim (inline %make-queue))
(defstruct (queue (:constructor %make-queue (head tail name))
(:copier nil)
(:predicate queuep))
"Lock-free thread safe queue."
(head (error "No HEAD.") :type node)
(tail (error "No TAIL.") :type node)
(name nil))
(setf (documentation 'queuep 'function)
"Returns true if argument is a QUEUE, NIL otherwise."
(documentation 'queue-name 'function)
"Name of a QUEUE. Can be assingned to using SETF. Queue names
can be arbitrary printable objects, and need not be unique.")
(defun make-queue (&key name initial-contents)
"Returns a new QUEUE with NAME and contents of the INITIAL-CONTENTS
sequence enqueued."
(let* ((dummy (make-node :value +dummy+))
(queue (%make-queue dummy dummy name)))
(flet ((enc-1 (x)
(enqueue x queue)))
(declare (dynamic-extent #'enc-1))
(map nil #'enc-1 initial-contents))
queue))
(defun enqueue (value queue)
"Adds VALUE to the end of QUEUE. Returns VALUE."
(let ((node (make-node :value value)))
(loop for tail = (queue-tail queue)
do (setf (node-next node) tail)
(when (eq tail (sb-ext:compare-and-swap (queue-tail queue) tail node))
(setf (node-prev tail) node)
(return value)))))
(defun dequeue (queue)
"Retrieves the oldest value in QUEUE and returns it as the primary value,
and T as secondary value. If the queue is empty, returns NIL as both primary
and secondary value."
(tagbody
:continue
(let* ((head (queue-head queue))
(tail (queue-tail queue))
(first-node-prev (node-prev head))
(val (node-value head)))
(when (eq head (queue-head queue))
(cond ((not (eq val +dummy+))
(if (eq tail head)
(let ((dummy (make-node :value +dummy+ :next tail)))
(when (eq tail (sb-ext:compare-and-swap (queue-tail queue)
tail dummy))
(setf (node-prev head) dummy))
(go :continue))
(when (null first-node-prev)
(fixList queue tail head)
(go :continue)))
(when (eq head (sb-ext:compare-and-swap (queue-head queue)
head first-node-prev))
;; This assignment is not present in the paper, but is
;; equivalent to the free(head.ptr) call there: it unlinks
;; the HEAD from the queue -- the code in the paper leaves
;; the dangling pointer in place.
(setf (node-next first-node-prev) nil)
(return-from dequeue (values val t))))
((eq tail head)
(return-from dequeue (values nil nil)))
((null first-node-prev)
(fixList queue tail head)
(go :continue))
(t
(sb-ext:compare-and-swap (queue-head queue)
head first-node-prev)))))
(go :continue)))
(defun fixlist (queue tail head)
(let ((current tail))
(loop while (and (eq head (queue-head queue)) (not (eq current head)))
do (let ((next (node-next current)))
(when (not next)
(return-from fixlist nil))
(let ((nextNodePrev (node-prev next)))
(when (not (eq nextNodePrev current))
(setf (node-prev next) current))
(setf current next))))))
(defun list-queue-contents (queue)
"Returns the contents of QUEUE as a list without removing them from the
QUEUE. Mainly useful for manual examination of queue state."
(let (all)
(labels ((walk (node)
;; Since NEXT pointers are always right, traversing from tail
;; to head is safe.
(let ((value (node-value node))
(next (node-next node)))
(unless (eq +dummy+ value)
(push value all))
(when next
(walk next)))))
(walk (queue-tail queue)))
all))
(defun queue-count (queue)
"Returns the number of objects in QUEUE. Mainly useful for manual
examination of queue state, and in PRINT-OBJECT methods: inefficient as it
walks the entire queue."
(let ((n 0))
(declare (unsigned-byte n))
(labels ((walk (node)
(let ((value (node-value node))
(next (node-next node)))
(unless (eq +dummy+ value)
(incf n))
(when next
(walk next)))))
(walk (queue-tail queue))
n)))
(defun queue-empty-p (queue)
"Returns T if QUEUE is empty, NIL otherwise."
(let* ((head (queue-head queue))
(tail (queue-tail queue))
(val (node-value head)))
(and (eq head tail) (eq val +dummy+))))
--- NEW FILE: sb-concurrency.asd ---
;;;; -*- Lisp -*-
;;;;
;;;; This software is part of the SBCL system. See the README file for
;;;; more information.
;;;;
;;;; This software is derived from the CMU CL system, which was
;;;; written at Carnegie Mellon University and released into the
;;;; public domain. The software is in the public domain and is
;;;; provided with absolutely no warranty. See the COPYING and CREDITS
;;;; files for more information.
(in-package :cl-user)
(asdf:defsystem :sb-concurrency
:components ((:file "package")
(:file "queue" :depends-on ("package"))
(:file "mailbox" :depends-on ("package" "queue"))))
(asdf:defsystem :sb-concurrency-tests
:depends-on (:sb-concurrency :sb-rt)
:components
((:module tests
:components
((:file "package")
(:file "test-utils" :depends-on ("package"))
(:file "test-queue" :depends-on ("package" "test-utils"))
(:file "test-mailbox" :depends-on ("package" "test-utils"))))))
(defmethod asdf:perform :after ((o asdf:load-op)
(c (eql (asdf:find-system :sb-concurrency))))
(provide 'sb-concurrency))
(defmethod asdf:perform ((o asdf:test-op)
(c (eql (asdf:find-system :sb-concurrency))))
(asdf:oos 'asdf:load-op :sb-concurrency-tests)
(asdf:oos 'asdf:test-op :sb-concurrency-tests))
(defmethod asdf:perform ((o asdf:test-op)
(c (eql (asdf:find-system :sb-concurrency-tests))))
(or (funcall (intern "DO-TESTS" (find-package "SB-RT")))
(error "~S failed" 'asdf:test-op)))
--- NEW FILE: sb-concurrency.texinfo ---
@node sb-concurrency
@section sb-concurrency
@cindex Concurrency
@cindex Sb-concurrency
Additional data structures, synchronization primitives and tools for
concurrent programming. Similiar to Java's @code{java.util.concurrent}
package.
@page
@anchor{Section sb-concurrency:queue}
@subsection Queue
@cindex Queue, lock-free
@code{sb-concurrency:queue} is a lock-free, thread-safe FIFO queue
datatype.
@*@*
The implementation is based on @cite{An Optimistic Approach to
Lock-Free FIFO Queues} by Edya Ladan-Mozes and Nir Shavit.
@*@*
Before SBCL 1.0.38, this implementation resided in its own contrib
(@pxref{sb-queue}) which is still provided for backwards-compatibility
but which has since been deprecated.
@sp 1
@unnumberedsubsubsec Synopsis:
@code{enqueue} can be used to add objects to a queue, and
@code{dequeue} retrieves items from a queue in FIFO order.
@sp 1
@unnumberedsubsubsec Dictionary:
@include struct-sb-concurrency-queue.texinfo
@include fun-sb-concurrency-dequeue.texinfo
@include fun-sb-concurrency-enqueue.texinfo
@include fun-sb-concurrency-list-queue-contents.texinfo
@include fun-sb-concurrency-make-queue.texinfo
@include fun-sb-concurrency-queue-count.texinfo
@include fun-sb-concurrency-queue-empty-p.texinfo
@include fun-sb-concurrency-queue-name.texinfo
@include fun-sb-concurrency-queuep.texinfo
@page
@subsection Mailbox (lock-free)
@cindex Mailbox, lock-free
@code{sb-concurrency:mailbox} is a lock-free message queue where one
or multiple ends can send messages to one or multiple receivers. The
difference to @ref{Section sb-concurrency:queue} is that the receiving
end may block until a message arrives.
@*@*
The implementation is based on the Queue implementation above
(@pxref{Structure sb-concurrency:queue}.)
@sp 1
@unnumberedsubsubsec Synopsis:
@code{send-message} can be used to send a message to a mailbox, and
@code{receive-message} retrieves a message from a mailbox, or blocks
until a new message arrives. @code{receive-message-no-hang} is the
non-blocking variant.
@*@*
Messages can be any object.
@sp 1
@unnumberedsubsubsec Dictionary:
@include struct-sb-concurrency-mailbox.texinfo
@include fun-sb-concurrency-list-mailbox-messages.texinfo
@include fun-sb-concurrency-mailbox-count.texinfo
@include fun-sb-concurrency-mailbox-empty-p.texinfo
@include fun-sb-concurrency-mailbox-name.texinfo
@include fun-sb-concurrency-mailboxp.texinfo
@include fun-sb-concurrency-make-mailbox.texinfo
@include fun-sb-concurrency-receive-message.texinfo
@include fun-sb-concurrency-receive-message-no-hang.texinfo
@include fun-sb-concurrency-receive-pending-messages.texinfo
@include fun-sb-concurrency-send-message.texinfo
|