简介
这篇文章主要讲解 sync.Pool 这个包,主要作用是用来在并发场景中 保存一些暂时不需要 但是过后会用的资源 为什么会有这个需求呢 是 为了 避免不断新建资源带来的资源浪费 也减少GC的压力 从而可以提升效率。
其主要的逻辑很简单 就是将 资源放进 资源池 用的时候再 获取 ,但是pool一般将put的资源和get的资源不做关联,我们需要断言来进行转换。关于pool的简介已经有很多了 不多介绍 我们直接来看源码
主要结构体
sync.Pool主要有结构体有 Pool、poolLocal、poolLocalInternal、poolChain、poolChainElt、poolDequeue这几种,我们来梳理下每个结构体,结构体之间的关系如下图:
其中 虚线红框 是可能获得和保存(除了New处) 数据的地方 ,请格外关注。
请大家一定要熟悉上面的图 因为接下来的逻辑梳理 我们会经常会看它
Pool
pool是核心结构体 是功能的主入口
其代码如下:
type Pool struct {
noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal // (primary cache)主缓存存储的数据 主要存储各[P]poolLocal 为各个P对应的poolLocal
localSize uintptr // size of the local array // [P]poolLocal 长度
victim unsafe.Pointer // local from previous cycle // (victim cache)受害者缓存 GC之前 将 主缓存内容([P]poolLocal头指针)赋值给受害者缓存
victimSize uintptr // size of victims array
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() any // 用来生成新的 池元素数据
}
poolLocal
poolLocal 是 处理器 p对于应的本地资源池 ,其代码如下:
type poolLocal struct {
poolLocalInternal // 每个处理器(P)对应的本地资源池
// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte // 防止为共享 操作系统相关知识 感兴趣的可以看看
}
其核心结构体是 poolLocalInternal 其结构如下:
// 每个P对应一个如下结构体.
type poolLocalInternal struct {
private any // Can be used only by the respective P. // 仅能被相应的 p使用 提供一个快速获取本P资源池数据的方法 因为只从属与P所以不用加锁 可减少同步开销,提高效率
shared poolChain // Local P can pushHead/popHead; any P can popTail. // 本地的 p (调度器)可以 执行 pushHead/popHead;任何p都能执行 popTail
}
其中 poolchain结构如下
poolCahin
这是一个实现了双向链表队列的池化双向队列,其中每个双端队列的大小是前一个的两倍。一旦一个双向队列填满,就会分配一个新的双向队列,并且只向最新的双向队列推送元素。弹出操作发生在列表的另一端,一旦一个双向队列被耗尽,它就会被从列表中移除。
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt // 头指针
// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt // 尾指针
}
其中 poolChainElt 结构如下:
poolChainElt
type poolChainElt struct {
poolDequeue // 存储池元素指针的列表
// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt // 前后指针
}
其中 poolDequeue 如下:
poolDequeue
type poolDequeue struct {
// headTail packs together a 32-bit head index and a 32-bit
// tail index. Both are indexes into vals modulo len(vals)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers.
// A consumer continues to own a slot outside this range until
// it nils the slot, at which point ownership passes to the
// producer.
//
// The head index is stored in the most-significant bits so
// that we can atomically add to it and the overflow is
// harmless.
headTail uint64 // 高32位 head 低32位 tail
// vals is a ring buffer of interface{} values stored in this
// dequeue. The size of this must be a power of 2.
//
// vals[i].typ is nil if the slot is empty and non-nil
// otherwise. A slot is still in use until *both* the tail
// index has moved beyond it and typ has been set to nil. This
// is set to nil atomically by the consumer and read
// atomically by the producer.
vals []eface // 存储 池基本元素指针的 数组
}
如此我们就大概介绍了这几种结构体 请大家对着关系图仔细梳理一遍。接下来我们来梳理下 向池中存数据逻辑,来看下Put():
Put()
put()就是向 资源池放数据。Put()的整体执行流程见 代码注释,请铭记。
其源码如下:
// Put adds x to the pool.
// Put 向pool添加x
// 逻辑:
// 先从本协程对应的P(GMP中的P)处获得 p对应的 poolLocal
// 如果 本地poolLocal的private为空 则将x放置在这里
// 如果不为空 则将 x放置在 poolLocal 对应的 poolChain中的头 poolDequeue 的头部
func (p *Pool) Put(x any) {
if x == nil {
return
}
if race.Enabled {
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 绑定当前G到当前P上,并获取P对应的 本地资源池 poolLocal,如果资源池为空 则 初始化资源池
l, _ := p.pin()
if l.private == nil {
l.private = x
} else {
//将 x放置在 poolLocal 对应的 poolChain中的头 poolDequeue 的头部
l.shared.pushHead(x)
}
// 解绑当前P和G 防止P和G尝时间绑定在一起 造成资源负载不平衡等问题
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
其中 pin()是 绑定G到当前P(处理器)上,代码如下:
// pin 主要逻辑
// 首先 绑定当前G到当前P
// 如果 本地资源池列表有当前P对应的资源 就返回资源池地址 和 pid
// 如果没有 就执行 pinSlow 首先 解绑 P和G 再绑定G和P(新P)
// 如果 当前p有对应的资源就 返回
// 如果没有 就舍弃旧的资源池列表([]poolLocal)重新初始化一个新的 然后 返回对应资源池 和 pid
func (p *Pool) pin() (*poolLocal, int) {
// 绑定当前G到当前P 进制抢占G 当前G不会被调到其他处理器上 也不会让出cpu时间
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
// 获取本地poolLocal 大小;使用了 load-acquire 技术 保证 取得是最新的 todo
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // 使用load-consume不保证最新 // load-consume
// 如果 pid小于s则 pid对应的 P本地资源池还没初始化; pid可以看做 在[]poolLocal中的索引
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// p对应的资源池没出初始化 就走这里
return p.pinSlow()
}
其中pin代码中的pinSlow代码如下:
func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
// 将本地Goruntime 和 当前运行的 P解绑
runtime_procUnpin()
// 加锁 确保 allPools并发安全
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 绑定当前协程到 新的P上
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
// 重新获取 当前P对应的资源池
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 获取不上(pid>=s) 当前local为nil 说明整个poolLocal资源池还没有初始化或者初始化了但是本地资源池列表数据 可能发生了GC;
// 这时两次都获取不到了 就要考虑重新分配本地存储列表了([]poolLocal)
// 如果p.local == nil 说明本地资源池还没加入 全局资源池列表 就加进去 allPools 用来GC
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
// 获取当前设置的处理器的数值 而不是重置它; 将旧的 资源池丢弃 重新初始化一个新的
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
这样 put()的插入前的资源准备工作就完成了,加下来我们来梳理下x的插入,见Put() 函数代码中的 l.shared.pushHead(x),其主要作用是将x插入P对应的资源池。我们来看下其源码:
func (c *poolChain) pushHead(val any) {
d := c.head
// 如果 当前 val 是 pool中的第一个 则 需要 初始化 poolChain 注意 vals的大小是2的倍数 初始是8
if d == nil {
// Initialize the chain.
const initSize = 8 // Must be a power of 2
d = new(poolChainElt)
d.vals = make([]eface, initSize)
c.head = d
storePoolChainElt(&c.tail, d)
}
// 将val放在当前 poolChainElt 的 vals 数组头部 如果 vals是不满的
if d.pushHead(val) {
return
}
// The current dequeue is full. Allocate a new one of twice
// the size.
// 当前 poolChainElt的 vals 已经满了 则需要重新建立一个新的poolChainElt 其中vals的大小是 前一个的2倍
newSize := len(d.vals) * 2
if newSize >= dequeueLimit {
// Can't make it any bigger.
newSize = dequeueLimit
}
//将 新poolChainElt加入poolChain中,并调整 poolChain的 head指针 指向新的poolChainElt, 并将 val加入vals的 头部
d2 := &poolChainElt{prev: d}
d2.vals = make([]eface, newSize)
c.head = d2
storePoolChainElt(&d.next, d2)
d2.pushHead(val)
}
这只是 poolChain这层的逻辑,可以对照结构体关系图来捋一下思路,真正的插入到资源池中 还是 poolChain.pushHead代码中的 d.pushHead(val),我们来看下源码:
// pushHead 添加 val到 queue(vals)的头部,如果queue满了返回错误。只能被单一生产者调用
func (d *poolDequeue) pushHead(val any) bool {
// 获取头尾值
ptrs := atomic.LoadUint64(&d.headTail)
// 拆分出头尾值
head, tail := d.unpack(ptrs)
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
// Queue is full.
return false
}
slot := &d.vals[head&uint32(len(d.vals)-1)]
// Check if the head slot has been released by popTail.
// todo 在哪里赋值
typ := atomic.LoadPointer(&slot.typ)
if typ != nil {
// Another goroutine is still cleaning up the tail, so
// the queue is actually still full.
return false
}
// The head slot is free, so we own it.
if val == nil {
val = dequeueNil(nil)
}
// 将val赋值给 slot 指向的内存
*(*any)(unsafe.Pointer(slot)) = val
// Increment head. This passes ownership of slot to popTail
// and acts as a store barrier for writing the slot.
// 头指针加一
// 建立起内存屏障
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
return true
}
到这里 插入操作就完成了,大家可以对照执行函数的结构体和接头体逻辑关系图来梳理下其调用逻辑。
接下来我们来看下Get()的逻辑,Get()由于涉及偷数据所以逻辑比较复杂,我们先来梳理下其源吗
Get()
Get()主执行逻辑见代码前头注释,其整体逻辑结构并不复杂我们来梳理下:
// 从 pool获取一个元素
// 逻辑步骤是:
// 将当前协程固定在当前处理器p上 这么做可以减少上下文切换 提高并发效率
// 1. 如果 本地poolLocal的private存在 则返回
// 2. 如果不存在 则 去 poolLocal 对应的 poolChain的 头部poolDequeue 的 头部 获取一个 val
// 3. 如果还是不存在 则循环去 []poolLocal(其他P对应的poolLocal) 的 poolLocal 尾部poolDequeue的尾部 获取一个 val
// 4. 如果还不存在 则证明 主缓存(primary cache)已经没有了 需要去 受害者缓存(victim cache)去取
// 5. 如果也不存在 则就调用 pool 的 New方法 产生一个
// 总的来说是 先从本协程对应的P获取val 如果获取失败就从其他P处获取val ,再去受害者缓存获取,再不行生成一个新的val
func (p *Pool) Get() any {
if race.Enabled {
race.Disable()
}
// 大意是获取 P的pid 和 poolLocal
l, pid := p.pin ()
//步骤1: 获取元素时 首先看看 poolLocal 的 private 是否为nil
x := l.private
l.private = nil
// 如果 x 为空
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
// 执行步骤2: 从 本地分片队列的头部取出数据 并初始化头部数据 我们倾向于从头部取数据 这样是为了利用时间局部性。
// (时间局部性是指短时间内重复访问相同数据的概率较高,从头部取数据更有效利用这一特征,因为Put()时是向头部添加)
// 注意:本地 poolChain 某个 poolChainElt 被取空后是不会被 置空的
x, _ = l.shared.popHead()
// private为空 本地poolChain里也没有 采用慢获取方法 这里就需要从别的poolLocal里尾部偷取了
if x == nil {
// 步骤3、4: 从别的 p对应的 poolLocal 里偷,还没有从受害者缓存去拿
x = p.getSlow(pid)
}
}
// 解除绑定
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 步骤5:
// 如果以上还是没有 就重新 生成一个
if x == nil && p.New != nil {
x = p.New()
}
return x
}
步骤2
我们来看下步骤2:
注意对照结构体逻辑结构图来看。l.shared.popHead() 代码处 popHead() 是属于 poolChain 的,其源吗如下:
代码逻辑挺简单的 ,就是从头部pooDequeue 开始循环去pooDequeue链表 中取数据,直到取到为止。
func (c *poolChain) popHead() (any, bool) {
d := c.head
for d != nil {
// 删除头部
if val, ok := d.popHead(); ok {
return val, ok
}
// There may still be unconsumed elements in the
// previous dequeue, so try backing up.
d = loadPoolChainElt(&d.prev)
}
return nil, false
}
其中 d.popHead() 是真实取数据的代码 d是 poolChainElt(请对照结构体图),其源码如下:
// popHead 从poolDequeue的vals头部删除一个 元素 如果删除失败返回 false 只允许一个单一的生产者使用
//
func (d *poolDequeue) popHead() (any, bool) {
var slot *eface
// 虽然是对 P本地内存的操作 但是可能有其他P的协程过来偷数据 所以涉及 资源修改 还是要使用 for+cas
for {
// 计算头尾 索引
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm tail and decrement head. We do this before
// reading the value to take back ownership of this
// slot.
head--
ptrs2 := d.pack(head, tail)
// 查看 有没有偷数据的行为存在 没有就 更新 ptrs2 并将 slot 数据取出来
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// We successfully took back slot.
slot = &d.vals[head&uint32(len(d.vals)-1)]
break
}
}
// 赋值给 val
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// Zero the slot. Unlike popTail, this isn't racing with
// pushHead, so we don't need to be careful here.
*slot = eface{}
return val, true
}
步骤3、4
这一步涉及从别的P对应的本地poolLocal来取数据和从受害者缓存来取数据
步骤3、4对应的代码入口是 Get()函数的 x = p.getSlow(pid) p 是 pool结构体,其中getSlow源码如下:
func (p *Pool) getSlow(pid int) any {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
//步骤3: 从其他 处理器(P)的 poolLocal偷一个 元素
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
//步骤4: 从别的 处理器 偷 失败了 说明 主缓存已经没有数据了 需要从 victim cache 偷 逻辑同上
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// Mark the victim cache as empty for future gets don't bother
// with it.
// 走到这里说明 victim cache 也没有数据了 那就吧 其长度置为0 防止以后再浪费时间去找元素
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
其中步骤3中 l.shared.popTail() 的l代表 poolLocal
其源码如下:
func (c *poolChain) popTail() (any, bool) {
d := loadPoolChainElt(&c.tail)
if d == nil {
return nil, false
}
// for 循环 从 尾部 poolChainElt节点开始 偷
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, d may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then d is permanently
// empty, which is the only condition under which it's
// safe to drop d from the chain.
// 获取 next 指针
d2 := loadPoolChainElt(&d.next)
// 获取 元素 从 尾部 获取到返回
if val, ok := d.popTail(); ok {
return val, ok
}
// 没有获取到 证明 d中已经空了 如果d2也空了 就返回错误
if d2 == nil {
// This is the only dequeue. It's empty right
// now, but could be pushed to in the future.
return nil, false
}
// The tail of the chain has been drained, so move on
// to the next dequeue. Try to drop it from the chain
// so the next pop doesn't have to look at the empty
// dequeue again.
// 将d 置为nil以便可以被垃圾回收器回收
// 只有 赢了竞争 才能 删除尾部 节点 这样才能触发GC
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
// We won the race. Clear the prev pointer so
// the garbage collector can collect the empty
// dequeue and so popHead doesn't back up
// further than necessary.
storePoolChainElt(&d2.prev, nil)
}
d = d2
}
}
其中步骤3中真实的删除是在 上述代码的 d.popTail()处 d代表poolDequeue
其源码如下:
// popTail 删除和返回 队列尾部的元素 如果是空的就返回 false 可能被好多消费者同时调用
func (d *poolDequeue) popTail() (any, bool) {
var slot *eface
// 从其他 P 处偷 就要考虑并发安全的问题了 这里采用 for+cas的方法
for {
ptrs := atomic.LoadUint64(&d.headTail)
head, tail := d.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil, false
}
// Confirm head and tail (for our speculative check
// above) and increment tail. If this succeeds, then
// we own the slot at tail.
// 尾部前进一位 计算新的headTail
ptrs2 := d.pack(head, tail+1)
// 竞争胜利 删除尾部
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
// Success.
// 获取槽数据
slot = &d.vals[tail&uint32(len(d.vals)-1)]
break
}
}
// We now own slot.
// 取出槽数组(其实时指针)指向的元素
val := *(*any)(unsafe.Pointer(slot))
if val == dequeueNil(nil) {
val = nil
}
// todo 待理解
// Tell pushHead that we're done with this slot. Zeroing the
// slot is also important so we don't leave behind references
// that could keep this object live longer than necessary.
//
// We write to val first and then publish that we're done with
// this slot by atomically writing to typ.
// 槽数据归零
slot.val = nil
atomic.StorePointer(&slot.typ, nil)
// At this point pushHead owns the slot.
return val, true
}
到这里步骤3就讲完了
步骤四设计到去victim cache 缓存去取数据 其执行逻辑跟 去别的poolLocal是一致的 至于 为啥会有 victim cache呢 其存储的是啥呢 先卖个关子
。
步骤5:
如果步骤1-4都没有获取到就需要 调用事先声明的New()函数来生成一个新的元素。
Get()逻辑比较复杂 好多细节还是没理解透彻 先 todo吧, 其大致执行流程如下:
注意的是 如果New事先没有声明,获取的数据可能也是nil