From 76368c0f1d2f32c7c56b05afd85bfa56b26abe2d Mon Sep 17 00:00:00 2001
From: "James M. Lawrence"
Date: Tue, 13 Nov 2012 19:18:43 0500
Subject: [PATCH] Replace sbconcurrency:queue implementation.
Singlylinked queue is faster and conses less.

contrib/sbconcurrency/queue.lisp  171 +++++++++++++++++
1 file changed, 76 insertions(+), 95 deletions()
diff git a/contrib/sbconcurrency/queue.lisp b/contrib/sbconcurrency/queue.lisp
index bf0bc98..6ea9031 100644
 a/contrib/sbconcurrency/queue.lisp
+++ b/contrib/sbconcurrency/queue.lisp
@@ 1,7 +1,5 @@
;;;; Lockfree FIFO queues, from "An Optimistic Approach to LockFree FIFO
;;;; Queues" by Edya LadanMozes and Nir Shavit.
;;;;
;;;; Written by Nikodemus Siivola for SBCL.
+;;;; Written by James M. Lawrence for SBCL.
+;;;; API and docstrings by Nikodemus Siivola.
;;;;
;;;; This software is part of the SBCL system. See the README file for
;;;; more information.
@@ 11,15 +9,29 @@
;;;; software is in the public domain and is provided with absolutely no
;;;; warranty. See the COPYING and CREDITS files for more information.
+;;; Singlylinked queue with compareandswap operations.
+;;;
+;;; Invariant:
+;;;
+;;; (car (queuehead queue)) == +dummy+
+;;;
+;;; The following are invariants except during lag across threads:
+;;;
+;;; (cdr (queuetail queue)) == nil
+;;;
+;;; If the queue is empty, (queuehead queue) == (queuetail queue).
+;;;
+;;; If the queue is nonempty, (cadr (queuehead queue)) is the next
+;;; value to be dequeued and (car (queuetail queue)) is the most
+;;; recently enqueued value.
+;;;
+;;; The CDR of a discarded node is set to +DEADEND+. This flag must
+;;; be checked at each traversal.
+
(inpackage :sbconcurrency)
(defconstant +dummy+ '.dummy.)

(declaim (inline makenode))
(defstruct node
 value
 (prev nil :type (or null node))
 (next nil :type (or null node)))
+(defconstant +deadend+ '.deadend.)
(declaim (inline %makequeue))
(defstruct (queue (:constructor %makequeue (head tail name))
@@ 28,8 +40,8 @@
"Lockfree thread safe FIFO queue.
Use ENQUEUE to add objects to the queue, and DEQUEUE to remove them."
 (head (error "No HEAD.") :type node)
 (tail (error "No TAIL.") :type node)
+ (head (error "No HEAD.") :type cons)
+ (tail (error "No TAIL.") :type cons)
(name nil))
(setf (documentation 'queuep 'function)
@@ 41,7 +53,7 @@ can be arbitrary printable objects, and need not be unique.")
(defun makequeue (&key name initialcontents)
"Returns a new QUEUE with NAME and contents of the INITIALCONTENTS
sequence enqueued."
 (let* ((dummy (makenode :value +dummy+))
+ (let* ((dummy (cons +dummy+ nil))
(queue (%makequeue dummy dummy name)))
(flet ((enc1 (x)
(enqueue x queue)))
@@ 51,108 +63,77 @@ sequence enqueued."
(defun enqueue (value queue)
"Adds VALUE to the end of QUEUE. Returns VALUE."
 (let ((node (makenode :value value)))
 (loop for tail = (queuetail queue)
 do (setf (nodenext node) tail)
 (when (eq tail (sbext:compareandswap (queuetail queue) tail node))
 (setf (nodeprev tail) node)
 (return value)))))
+ ;; Spin until QUEUETAIL is updated, attempt CAS, repeat upon
+ ;; failure. Upon success, update QUEUETAIL.
+ (let ((new (cons value nil)))
+ (tagbody
+ :continue
+ (let ((tail (queuetail queue)))
+ (when (cdr tail)
+ (go :continue))
+ (when (eq (sbext:compareandswap (cdr tail) nil new)
+ nil)
+ (setf (queuetail queue) new)
+ (returnfrom enqueue value)))
+ (go :continue))))
(defun dequeue (queue)
"Retrieves the oldest value in QUEUE and returns it as the primary value,
and T as secondary value. If the queue is empty, returns NIL as both primary
and secondary value."
+ ;; Spin until QUEUEHEAD is updated, attempt to CAS +dummy+ into the
+ ;; CADR of QUEUEHEAD, repeat upon failure. Upon success, update
+ ;; QUEUEHEAD and clear the discarded node.
(tagbody
:continue
(let* ((head (queuehead queue))
 (tail (queuetail queue))
 (firstnodeprev (nodeprev head))
 (val (nodevalue head)))
 (barrier (:read))
 (when (eq head (queuehead queue))
 (cond ((not (eq val +dummy+))
 (if (eq tail head)
 (let ((dummy (makenode :value +dummy+ :next tail)))
 (when (eq tail (sbext:compareandswap (queuetail queue)
 tail dummy))
 (setf (nodeprev head) dummy))
 (go :continue))
 (when (null firstnodeprev)
 (fixList queue tail head)
 (go :continue)))
 (when (eq head (sbext:compareandswap (queuehead queue)
 head firstnodeprev))
 ;; These assignment is not present in the paper, but are
 ;; equivalent to the free(head.ptr) call there.
 ;;
 ;; First we unlink the HEAD from the queue  the code in
 ;; the paper leaves the dangling pointer in place.
 ;;
 ;; Then we NIL out the slots in HEAD to help the GC,
 ;; otherwise conservativism might lead to massive chains of
 ;; nodes being retained.
 (setf (nodenext firstnodeprev) nil
 (nodeprev head) nil
 (nodenext head) nil
 (nodevalue head) nil)
 (returnfrom dequeue (values val t))))
 ((eq tail head)
 (returnfrom dequeue (values nil nil)))
 ((null firstnodeprev)
 (fixList queue tail head)
 (go :continue))
 (t
 (sbext:compareandswap (queuehead queue)
 head firstnodeprev)))))
+ (next (cdr head)))
+ (when (null next)
+ (returnfrom dequeue (values nil nil)))
+ (when (eq next +deadend+)
+ (go :continue))
+ (let ((target (car next)))
+ (when (eq target +dummy+)
+ (go :continue))
+ (when (eq (sbext:compareandswap (car next) target +dummy+)
+ target)
+ (setf (queuehead queue) next
+ ;; Clear the CDR, otherwise the conservative GC could
+ ;; hoard long lists. (car head) is always +dummy+.
+ (cdr head) +deadend+)
+ (returnfrom dequeue (values target t)))))
(go :continue)))
(defun fixlist (queue tail head)
 (let ((current tail))
 (loop while (and (eq head (queuehead queue)) (not (eq current head)))
 do (let ((next (nodenext current)))
 (when (not next)
 (returnfrom fixlist nil))
 (let ((nextNodePrev (nodeprev next)))
 (when (not (eq nextNodePrev current))
 (setf (nodeprev next) current))
 (setf current next))))))
+(defun callwitheachqueueelement (queue fun)
+ (let ((node (queuehead queue)))
+ (loop
+ (let ((value (car node)))
+ (unless (eq value +dummy+)
+ (funcall fun value)))
+ (setf node (cdr node))
+ (when (or (null node)
+ (eq node +deadend+))
+ (return)))))
(defun listqueuecontents (queue)
"Returns the contents of QUEUE as a list without removing them from the
QUEUE. Mainly useful for manual examination of queue state, as the list
may be out of date by the time it is returned."
 (let (all)
 (labels ((walk (node)
 ;; Since NEXT pointers are always right, traversing from tail
 ;; to head is safe.
 (let ((value (nodevalue node))
 (next (nodenext node)))
 (unless (eq +dummy+ value)
 (push value all))
 (when next
 (walk next)))))
 (walk (queuetail queue)))
 all))
+ (collect ((result))
+ (callwitheachqueueelement queue (lambda (element)
+ (result element)))
+ (result)))
(defun queuecount (queue)
"Returns the number of objects in QUEUE. Mainly useful for manual
examination of queue state, and in PRINTOBJECT methods: inefficient as it
must walk the entire queue."
 (let ((n 0))
 (declare (unsignedbyte n))
 (labels ((walk (node)
 (let ((value (nodevalue node))
 (next (nodenext node)))
 (unless (eq +dummy+ value)
 (incf n))
 (when next
 (walk next)))))
 (walk (queuetail queue))
 n)))
+ (let ((count 0))
+ (callwitheachqueueelement queue (lambda (element)
+ (declare (ignore element))
+ (incf count)))
+ count))
(defun queueemptyp (queue)
"Returns T if QUEUE is empty, NIL otherwise."
 (let* ((head (queuehead queue))
 (tail (queuetail queue))
 (val (nodevalue head)))
 (and (eq head tail) (eq val +dummy+))))
+ (null (cdr (queuehead queue))))

1.7.9.5