Title: Fast concurrent MPMC queue
--
I've used the following concurrent queue algorithm enough that it warrants a blog entry. I'll sketch out the design of a fast and scalable multiple-producer multiple-consumer (MPSC) concurrent queue called PTLQueue. The queue has bounded capacity and is implemented via a circular array. Bounded capacity can be a useful property if there's a mismatch between producer rates and consumer rates where an unbounded queue might otherwise result in excessive memory consumption by virtue of the container nodes that -- in some queue implementations -- are used to hold values. A bounded-capacity queue can provide flow control between components. Beware, however, that bounded collections can also result in resource deadlock if abused. The put() and take() operators are partial and wait for the collection to become non-full or non-empty, respectively. Put() and take() do not allocate memory, and are not vulnerable to the ABA pathologies. The PTLQueue algorithm can be implemented equally well in C/C++ and Java.
Partial operators are often more convenient than total methods. In many use cases if the preconditions aren't met, there's nothing else useful the thread can do, so it may as well wait via a partial method. An exception is in the case of work-stealing queues where a thief might scan a set of queues from which it could potentially steal. Total methods return ASAP with a success-failure indication. (It's tempting to describe a queue or API as blocking or non-blocking instead of partial or total, but non-blocking is already an overloaded concurrency term. Perhaps waiting/non-waiting or patient/impatient might be better terms). It's also trivial to construct partial operators by busy-waiting via total operators, but such constructs may be less efficient than an operator explicitly and intentionally designed to wait.
A PTLQueue instance contains an array of slots, where each slot has volatile Turn and MailBox fields. The array has power-of-two length allowing mod/div operations to be replaced by masking. We assume sensible padding and alignment to reduce the impact of false sharing. (On x86 I recommend 128-byte alignment and padding because of the adjacent-sector prefetch facility). Each queue also has PutCursor and TakeCursor cursor variables, each of which should be sequestered as the sole occupant of a cache line or sector. You can opt to use 64-bit integers if concerned about wrap-around aliasing in the cursor variables. Put(null) is considered illegal, but the caller or implementation can easily check for and convert null to a distinguished non-null proxy value if null happens to be a value you'd like to pass. Take() will accordingly convert the proxy value back to null. An advantage of PTLQueue is that you can use atomic fetch-and-increment for the partial methods. We initialize each slot at index I with (Turn=I, MailBox=null). Both cursors are initially 0. All shared variables are considered "volatile" and atomics such as CAS and AtomicFetchAndIncrement are presumed to have bidirectional fence semantics. Finally T is the templated type. I've sketched out a total tryTake() method below that allows the caller to poll the queue. tryPut() has an analogous construction.
Zebra stripping : alternating row colors for nice-looking code listings.
See also google code "prettify" : https://code.google.com/p/google-code-prettify/
Prettify is a javascript module that yields the HTML/CSS/JS equivalent of pretty-print.
--
pre:nth-child(odd) { background-color:#ff0000; }
pre:nth-child(even) { background-color:#0000ff; }
border-left: 11px solid #ccc; margin: 1.7em 0 1.7em 0.3em; background-color:#BFB; font-size:12px; line-height:65%; "
// PTLQueue :
Put(v) : // producer : partial method - waits as necessary
assert v != null
assert Mask = 1 && (Mask & (Mask+1)) == 0 // Document invariants
// doorway step
// Obtain a sequence number -- ticket
// As a practical concern the ticket value is temporally unique
// The ticket also identifies and selects a slot
auto tkt = AtomicFetchIncrement (&PutCursor, 1)
slot * s = &Slots[tkt & Mask]
// waiting phase :
// wait for slot's generation to match the tkt value assigned to this put() invocation.
// The "generation" is implicitly encoded as the upper bits in the cursor
// above those used to specify the index : tkt div (Mask+1)
// The generation serves as an epoch number to identify a cohort of threads
// accessing disjoint slots
while s-Turn != tkt : Pause
assert s-MailBox == null
s-MailBox = v // deposit and pass message
Take() : // consumer : partial method - waits as necessary
auto tkt = AtomicFetchIncrement (&TakeCursor,1)
slot * s = &Slots[tkt & Mask]
// 2-stage waiting :
// First wait for turn for our generation
// Acquire exclusive "take" access to slot's MailBox field
// Then wait for the slot to become occupied
while s-Turn != tkt : Pause
// Concurrency in this section of code is now reduced to just 1 producer thread
// vs 1 consumer thread.
// For a given queue and slot, there will be most one Take() operation running
// in this section.
// Consumer waits for producer to arrive and make slot non-empty
// Extract message; clear mailbox; advance Turn indicator
// We have an obvious happens-before relation :
// Put(m) happens-before corresponding Take() that returns that same "m"
for
T v = s-MailBox
if v != null :
s-MailBox = null
ST-ST barrier
s-Turn = tkt + Mask + 1 // unlock slot to admit next producer and consumer
return v
Pause
tryTake() : // total method - returns ASAP with failure indication
for
auto tkt = TakeCursor
slot * s = &Slots[tkt & Mask]
if s-Turn != tkt : return null
T v = s-MailBox // presumptive return value
if v == null : return null
// ratify tkt and v values and commit by advancing cursor
if CAS (&TakeCursor, tkt, tkt+1) != tkt : continue
s-MailBox = null
ST-ST barrier
s-Turn = tkt + Mask + 1
return v
The basic idea derives from the Partitioned Ticket Lock "PTL" (US20120240126-A1) and the MultiLane Concurrent Bag (US8689237). The latter is essentially a circular ring-buffer where the elements themselves are queues or concurrent collections. You can think of the PTLQueue as a partitioned ticket lock "PTL" augmented to pass values from lock to unlock via the slots. Alternatively, you could conceptualize of PTLQueue as a degenerate MultiLane bag where each slot or "lane" consists of a simple single-word MailBox instead of a general queue. Each lane in PTLQueue also has a private Turn field which acts like the Turn (Grant) variables found in PTL. Turn enforces strict FIFO ordering and restricts concurrency on the slot mailbox field to at most one simultaneous put() and take() operation.
PTL uses a single "ticket" variable and per-slot Turn (grant) fields while MultiLane has distinct PutCursor and TakeCursor cursors and abstract per-slot sub-queues. Both PTL and MultiLane advance their cursor and ticket variables with atomic fetch-and-increment. PTLQueue borrows from both PTL and MultiLane and has distinct put and take cursors and per-slot Turn fields. Instead of a per-slot queues, PTLQueue uses a simple single-word MailBox field. PutCursor and TakeCursor act like a pair of ticket locks, conferring "put" and "take" access to a given slot. PutCursor, for instance, assigns an incoming put() request to a slot and serves as a PTL "Ticket" to acquire "put" permission to that slot's MailBox field. To better explain the operation of PTLQueue we deconstruct the operation of put() and take() as follows. Put() first increments PutCursor obtaining a new unique ticket. That ticket value also identifies a slot. Put() next waits for that slot's Turn field to match that ticket value. This is tantamount to using a PTL to acquire "put" permission on the slot's MailBox field. Finally, having obtained exclusive "put" permission on the slot, put() stores the message value into the slot's MailBox. Take() similarly advances TakeCursor, identifying a slot, and then acquires and secures "take" permission on a slot by waiting for Turn. Take() then waits for the slot's MailBox to become non-empty, extracts the message, and clears MailBox. Finally, take() advances the slot's Turn field, which releases both "put" and "take" access to the slot's MailBox. Note the asymmetry : put() acquires "put" access to the slot, but take() releases that lock. At any given time, for a given slot in a PTLQueue, at most one thread has "put" access and at most one thread has "take" access. This restricts concurrency from general MPMC to 1-vs-1. We have 2 ticket locks -- one for put() and one for take() -- each with its own "ticket" variable in the form of the corresponding cursor, but they share a single "Grant" egress variable in the form of the slot's Turn variable. Advancing the PutCursor, for instance, serves two purposes. First, we obtain a unique ticket which identifies a slot. Second, incrementing the cursor is the doorway protocol step to acquire the per-slot mutual exclusion "put" lock. The cursors and operations to increment those cursors serve double-duty : slot-selection and ticket assignment for locking the slot's MailBox field.
At any given time a slot MailBox field can be in one of the following states: empty with no pending operations -- neutral state; empty with one or more waiting take() operations pending -- deficit; occupied with no pending operations; occupied with one or more waiting put() operations -- surplus; empty with a pending put() or pending put() and take() operations -- transitional; or occupied with a pending take() or pending put() and take()
operations -- transitional.
The partial put() and take() operators can be implemented with an atomic fetch-and-increment operation, which may confer a performance advantage over a CAS-based loop. In addition we have independent PutCursor and TakeCursor cursors. Critically, a put() operation modifies PutCursor but does not access the TakeCursor and a take() operation modifies the TakeCursor cursor but does not access the PutCursor. This acts to reduce coherence traffic relative to some other queue designs.
It's worth noting that slow threads or obstruction in one slot (or "lane") does not impede or obstruct operations in other slots -- this gives us some degree of obstruction isolation. PTLQueue is not lock-free, however.
The implementation above is expressed with polite busy-waiting (Pause) but it's trivial to implement per-slot parking and unparking to deschedule waiting threads. It's also easy to convert the queue to a more general deque by replacing the PutCursor and TakeCursor cursors with Left/Front and Right/Back cursors that can move either direction. Specifically, to push and pop from the "left" side of the deque we would decrement and increment the Left cursor, respectively, and to push and pop from the "right" side of the deque we would increment and decrement the Right cursor, respectively.
We used a variation of PTLQueue for message passing in our recent OPODIS 2013 paper.
ul { list-style:none; padding-left:0; padding:0; margin:0; margin-left:0; }
ul#myTagID { padding: 0px; margin: 0px; list-style:none; margin-left:0;}
--
--
There's quite a bit of related literature in this area. I'll call out a few relevant references:
Wilson's NYU Courant Institute UltraComputer dissertation from 1988 is classic and the canonical starting point : Operating System Data Structures for Shared-Memory MIMD Machines with Fetch-and-Add. Regarding provenance and priority, I think PTLQueue or queues effectively equivalent to PTLQueue have been independently rediscovered a number of times. See CB-Queue and BNPBV, below, for instance. But Wilson's dissertation anticipates the basic idea and seems to predate all the others.
Gottlieb et al : Basic Techniques for the Efficient Coordination of Very Large Numbers of Cooperating Sequential Processors
Orozco et al : CB-Queue in Toward high-throughput algorithms on many-core architectures which appeared in TACO 2012.
Meneghin et al : BNPVB family in Performance evaluation of inter-thread communication mechanisms on multicore/multithreaded architecture
Dmitry Vyukov : bounded MPMC queue (highly recommended)
Alex Otenko : US8607249 (highly related).
John Mellor-Crummey : Concurrent queues: Practical fetch-and-phi algorithms. Technical Report 229, Department of Computer Science, University of Rochester
Thomasson : FIFO Distributed Bakery Algorithm (very similar to PTLQueue).
Scott and Scherer : Dual Data Structures
I'll propose an optimization left as an exercise for the reader. Say we wanted to reduce memory usage by eliminating inter-slot padding. Such padding is usually "dark" memory and otherwise unused and wasted. But eliminating the padding leaves us at risk of increased false sharing. Furthermore lets say it was usually the case that the PutCursor and TakeCursor were numerically close to each other. (That's true in some use cases). We might still reduce false sharing by incrementing the cursors by some value other than 1 that is not trivially small and is coprime with the number of slots. Alternatively, we might increment the cursor by one and mask as usual, resulting in a logical index. We then use that logical index value to index into a permutation table, yielding an effective index for use in the slot array. The permutation table would be constructed so that nearby logical indices would map to more distant effective indices. (Open question: what should that permutation look like? Possibly some perversion of a Gray code or De Bruijn sequence might be suitable).
As an aside, say we need to busy-wait for some condition as follows : "while C == 0 : Pause". Lets say that C is usually non-zero, so we typically don't wait. But when C happens to be 0 we'll have to spin for some period, possibly brief. We can arrange for the code to be more machine-friendly with respect to the branch predictors by transforming the loop into : "if C == 0 : for { Pause; if C != 0 : break; }". Critically, we want to restructure the loop so there's one branch that controls entry and another that controls loop exit. A concern is that your compiler or JIT might be clever enough to transform this back to "while C == 0 : Pause". You can sometimes avoid this by inserting a call to a some type of very cheap "opaque" method that the compiler can't elide or reorder. On Solaris, for instance, you could use :"if C == 0 : { gethrtime(); for { Pause; if C != 0 : break; }}".
It's worth noting the obvious duality between locks and queues. If you have strict FIFO lock implementation with local spinning and succession by direct handoff such as MCS or CLH,then you can usually transform that lock into a queue.
Hidden commentary and annotations - invisible :
* And of course there's a well-known duality between queues and locks, but
I'll leave that topic for another blog post.
* Compare and contrast : PTLQ vs PTL and MultiLane
* Equivalent : Turn; seq; sequence; pos; position; ticket
* Put = Lock; Deposit
Take = identify and reserve slot; wait; extract & clear; unlock
* conceptualize :
Distinct PutLock and TakeLock implemented as ticket lock or PTL
Distinct arrival cursors but share per-slot "Turn" variable
provides exclusive role-based access to slot's mailbox field
put()
acquires exclusive access to a slot for purposes of "deposit"
assigns slot round-robin and then acquires deposit access rights/perms to that slot
take()
acquires exclusive access to slot for purposes of "withdrawal"
assigns slot round-robin and then acquires withdrawal access rights/perms to that slot
At any given time, only one thread can have withdrawal access to a slot
at any given time, only one thread can have deposit access to a slot
Permissible for T1 to have deposit access and T2 to simultaneously have
withdrawal access
* round-robin
for the purposes of; role-based; access mode; access role
mailslot; mailbox;
allocate/assign/identify slot
rights; permission; license; access permission;
* PTL/Ticket hybrid
Asymmetric usage ; owner oblivious lock-unlock pairing
K-exclusion
add Grant cursor
pass message m from lock to unlock via Slots[] array
Cursor performs 2 functions :
+ PTL ticket
+ Assigns request to slot in round-robin fashion
Deconstruct protocol : explication
put() :
allocate slot in round-robin fashion
acquire PTL for "put" access
store message into slot associated with PTL index
take() :
Acquire PTL for "take" access
// doorway step
seq = fetchAdd (&Grant, 1)
s = &Slots[seq & Mask]
// waiting phase
while s-Turn != seq : pause
Extract :
wait for s-mailbox to be full
v = s-mailbox
s-mailbox = null
Release PTL for both "put" and "take" access
s-Turn = seq + Mask + 1
* Slot round-robin assignment and lock "doorway" protocol
leverage the same cursor and FetchAdd operation on that cursor
FetchAdd (&Cursor,1)
+ round-robin slot assignment and dispersal
+ PTL/ticket lock "doorway" step
waiting phase is via "Turn" field in slot
* PTLQueue uses 2 cursors -- put and take.
Acquire "put" access to slot via PTL-like lock
Acquire "take" access to slot via PTL-like lock
2 locks : put and take -- at most one thread can access slot's mailbox
Both locks use same "turn" field
Like multilane :
2 cursors : put and take
slot is simple 1-capacity mailbox instead of queue
Borrow per-slot turn/grant from PTL
Provides strict FIFO
Lock slot : put-vs-put take-vs-take
at most one put accesses slot at any one time
at most one put accesses take at any one time
reduction to 1-vs-1 instead of N-vs-M concurrency
Per slot locks for put/take
Release put/take by advancing turn
* is instrumental in ...
* P-V Semaphore vs lock vs K-exclusion
* See also :
FastQueues-excerpt.java
dice-etc/queue-mpmc-bounded-blocking-circular-xadd/
* PTLQueue is the same as PTLQB - identical
* Expedient return; ASAP; prompt; immediately
* Lamport's Bakery algorithm : doorway step then waiting phase
Threads arriving at doorway obtain a unique ticket number
Threads enter in ticket order
* In the terminology of Reed and Kanodia a ticket lock corresponds to the
busy-wait implementation of a semaphore using an eventcount and a sequencer
It can also be thought of as an optimization of Lamport's bakery lock was
designed for fault-tolerance rather than performance Instead of spinning on
the release counter, processors using a bakery lock repeatedly examine the
tickets of their peers
--