Source File
poolqueue.go
Belonging Package
sync
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
)
// poolDequeue is a lock-free fixed-size single-producer,
// multi-consumer queue. The single producer can both push and pop
// from the head, and consumers can pop from the tail.
//
// It has the added feature that it nils out unused slots to avoid
// unnecessary retention of objects. This is important for sync.Pool,
// but not typically a property considered in the literature.
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []eface
}
type eface struct {
typ, val unsafe.Pointer
}
const dequeueBits = 32
// dequeueLimit is the maximum size of a poolDequeue.
//
// This must be at most (1<<dequeueBits)/2 because detecting fullness
// depends on wrapping around the ring buffer without wrapping around
// the index. We divide by 4 so this fits in an int on 32-bit.
const dequeueLimit = (1 << dequeueBits) / 4
// dequeueNil is used in poolDequeue to represent interface{}(nil).
// Since we use nil to represent empty slots, we need a sentinel value
// to represent nil.
type dequeueNil *struct{}
func ( *poolDequeue) ( uint64) (, uint32) {
const = 1<<dequeueBits - 1
= uint32(( >> dequeueBits) & )
= uint32( & )
return
}
func ( *poolDequeue) (, uint32) uint64 {
const = 1<<dequeueBits - 1
return (uint64() << dequeueBits) |
uint64(&)
}
// pushHead adds val at the head of the queue. It returns false if the
// queue is full. It must only be called by a single producer.
func ( *poolDequeue) ( any) bool {
:= atomic.LoadUint64(&.headTail)
, := .unpack()
if (+uint32(len(.vals)))&(1<<dequeueBits-1) == {
// Queue is full.
return false
}
:= &.vals[&uint32(len(.vals)-1)]
// Check if the head slot has been released by popTail.
:= atomic.LoadPointer(&.typ)
if != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if == nil {
= dequeueNil(nil)
}
*(*any)(unsafe.Pointer()) =
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
atomic.AddUint64(&.headTail, 1<<dequeueBits)
return true
}
// popHead removes and returns the element at the head of the queue.
// It returns false if the queue is empty. It must only be called by a
// single producer.
func ( *poolDequeue) () (any, bool) {
var *eface
for {
:= atomic.LoadUint64(&.headTail)
, := .unpack()
if == {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
--
:= .pack(, )
if atomic.CompareAndSwapUint64(&.headTail, , ) {
// We successfully took back slot.
= &.vals[&uint32(len(.vals)-1)]
break
}
}
:= *(*any)(unsafe.Pointer())
if == dequeueNil(nil) {
= nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
* = eface{}
return , true
}
// popTail removes and returns the element at the tail of the queue.
// It returns false if the queue is empty. It may be called by any
// number of consumers.
func ( *poolDequeue) () (any, bool) {
var *eface
for {
:= atomic.LoadUint64(&.headTail)
, := .unpack()
if == {
// Queue is empty.
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
:= .pack(, +1)
if atomic.CompareAndSwapUint64(&.headTail, , ) {
// Success.
= &.vals[&uint32(len(.vals)-1)]
break
}
}
// We now own slot.
:= *(*any)(unsafe.Pointer())
if == dequeueNil(nil) {
= nil
}
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
.val = nil
atomic.StorePointer(&.typ, nil)
// At this point pushHead owns the slot.
return , true
}
// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}
type poolChainElt struct {
poolDequeue
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}
func ( **poolChainElt, *poolChainElt) {
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer()), unsafe.Pointer())
}
func ( **poolChainElt) *poolChainElt {
return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer())))
}
func ( *poolChain) ( any) {
:= .head
if == nil {
// Initialize the chain.
const = 8 // Must be a power of 2
= new(poolChainElt)
.vals = make([]eface, )
.head =
storePoolChainElt(&.tail, )
}
if .pushHead() {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
:= len(.vals) * 2
if >= dequeueLimit {
// Can't make it any bigger.
= dequeueLimit
}
:= &poolChainElt{prev: }
.vals = make([]eface, )
.head =
storePoolChainElt(&.next, )
.pushHead()
}
func ( *poolChain) () (any, bool) {
:= .head
for != nil {
if , := .popHead(); {
return ,
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
= loadPoolChainElt(&.prev)
}
return nil, false
}
func ( *poolChain) () (any, bool) {
:= loadPoolChainElt(&.tail)
if == nil {
return nil, false
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
:= loadPoolChainElt(&.next)
if , := .popTail(); {
return ,
}
if == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&.tail)), unsafe.Pointer(), unsafe.Pointer()) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&.prev, nil)
}
=
}
}
The pages are generated with Golds v0.6.7. (GOOS=linux GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |