Workqueue

workqueue 工作队列
在 kubernetes 中,使用 go 的 channel 无法满足 kubernetes 的应用场景,如延迟、限速等; 在kubernetes中存在三种队列通用队列 common queue,延迟队列 delaying queue ,和限速队列 rate limiters queue
主要功能在于标记和去重,并支持如下特性。
- 有序:按照添加顺序处理元素(item)。
- 去重:相同元素在同一时间不会被重复处理,例如一个元素在处理之前被添加了多次,它只会被处理一次。
- 并发性:多生产者和多消费者。
- 标记机制:支持标记功能,标记一个元素是否被处理,也允许元素在处理时重新排队。
- 通知机制:ShutDown 方法通过信号量通知队列不再接收新的元素,并通知 metric goroutine 退出。
- 延迟:支持延迟队列,延迟一段时间后再将元素存入队列。
- 限速:支持限速队列,元素存入队列时进行速率限制。限制一个元素被重新排队(Reenqueued)的次数。
- Metric:支持 metric 监控指标,可用于 Prometheus 监控。
需求
为什么队列需要去重功能 ?
当一个资源对象被频繁变更, 然而同一个对象还未被消费, 没必要在在队列中存多份, 经过去重后只需要处理一次即可.
为什么需要 delay 延迟入队功能 ?
有些 k8s controller 是需要延迟队列功能的, 比如像 cronjob 依赖延迟队列实现定时功能. 另外也可以实现延迟 backoff 时长后重入队.
为什么需要限频功能 ?
避免过多事件并发入队, 使用限频策略对入队的事件个数进行控制. k8s 中的 controller 大把的使用限频.
informer 中的 deltafifo 跟 workqueue 区别?
deltafifo 虽然名为 fifo 队列, 但他的 fifo 不是全局事件, 而只是针对某资源对象的事件进行内部 fifo 排列. 比如某个 deployment 频繁做变更, 那么 deltafifo 逻辑是把后续收到的相关事件放在一起.
WorkQueue 分类
WorkQueue 支持 3 种队列,并提供了 3 种接口,不同队列实现可应对不同的使用场景,分别介绍如下。
Interface:FIFO 队列接口,先进先出队列,并支持去重机制。
DelayingInterface:延迟队列接口,基于 Interface 接口封装,延迟一段时间后再将元素存入队列。
RateLimitingInterface:限速队列接口,基于 DelayingInterface 接口封装,支持元素存入队列时进行速率限制
FIFO 队列
流程

通过 Add 方法往 FIFO 队列中分别插入 1、2、3 这 3 个元素,此时队列中的 queue 和 dirty 字段分别存有 1、2、3 元素,processing 字段为空。 然后通过 Get 方法获取最先进入的元素(也就是 1 元素),此时队列中的 queue 和 dirty 字段分别存有 2、3 元素,而 1 元素会被放入 processing 字段中,表示该元素正在被处理。 最后,当我们处理完 1 元素时,通过 Done 方法标记该元素已经被处理完成,此时队列中的 processing 字段中的 1 元素会被删除。
结构体
// Type is a work queue (see the package comment).
type Type struct {
queue []t
dirty set
processing set
// ...
}
type empty struct{}
type t interface{}
type set map[t]empty
- queue 字段是实际存储元素的地方,它是 slice 结构的,用于保证元素有序;
- dirty 字段非常关键,除了能保证去重,还能保证在处理一个元素之前哪怕其被添加了多次(并发情况下),但也只会被处理一次;
- processing 字段用于标记机制,标记一个元素是否正在被处理。
并发场景描述及源码解释
deployment controller 处理元素
func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
// 拿元素
key, quit := dc.queue.Get()
if quit {
return false
}
// defer 标记结束
defer dc.queue.Done(key)
err := dc.syncHandler(ctx, key.(string))
dc.handleErr(ctx, err, key)
return true
}

- 在并发场景下,假设 goroutine A 通过 Get 方法获取 1 元素,1 元素被添加到 processing 字段中,同一时间,goroutine B 通过 Add 方法插入另一个 1 元素,此时在 processing 字段中已经存在相同的元素,所以后面的 1 元素并不会被直接添加到 queue 字段中,当前 FIFO 队列中的 dirty 字段中存有 1、2、3 元素,processing 字段存有 1 元素。
- 在 goroutine A 通过 Done 方法标记处理完成后,如果 dirty 字段中存有 1 元素,则将 1 元素追加到 queue 字段中的尾部。
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
// 判断 dirty 是否存在该元素, 如存在则直接跳出, 其目的是为了实现待处理元素的去重效果.
return
}
q.metrics.add(item)
q.dirty.insert(item) // 在 dirty 里添加元素
if q.processing.has(item) {
// 判断 processing 集合是否存在元素, 如果存在则跳出. 其目的是为了防止同一个元素被并发处理.
return
}
// 把元素放到队列里
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Done() 用来标记某元素已经处理完,
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
// 会把 dirty 的任务重新入队, 起到了排队的效果.
q.queue = append(q.queue, item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
}
}
延迟队列 DelayingInterface

type DelayingInterface interface {
// 继承 Queue Interface 的基本功能
Interface
// 添加定时功能
AddAfter(item interface{}, duration time.Duration)
}
数据结构定义
type delayingType struct {
// 继承 Queue Interface 队列基本功能
Interface
// 对比的时间 ,包含一些定时器的功能
clock clock.Clock
// 退出通道
stopCh chan struct{}
stopOnce sync.Once
// 周期性检测队列是否有对象到期
heartbeat clock.Ticker
// 新的定时元素会推到该管道中, 等待 loop 处理.
waitingForAddCh chan *waitFor
// 用来 metrics 统计
metrics retryMetrics
}
delay queue 使用了 heap 做延迟队列。

// 心跳的时长
const maxWait = 10 * time.Second
// 构建定时器队列对象方法
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
// clock 为 k8s 内部封装的时间对象
// NewNamed 用来生成 Queue.
return newDelayingQueue(clock, NewNamed(name), name)
}
// 真正的构建定时器队列对象方法
func newDelayingQueue(clock clock.WithTicker, q Interface, name string) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}
go ret.waitingLoop()
return ret
}
func (q *delayingType) waitingLoop() {
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
// 初始化 min heap 小顶堆
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
// 如果 queue 已经被关闭, 则退出该 loop 协程.
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
for waitingForQueue.Len() > 0 {
// 如果延迟 heap 不为空, 则获取堆顶的元素.
entry := waitingForQueue.Peek().(*waitFor)
// 如果大于当前时间, 则没有到期, 则跳出.
if entry.readyAt.After(now) {
break
}
// 如果小于当前时间, 则 pop 出元素, 然后扔到 queue 队里中.
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// 如果小顶堆为空, 则使用 never 做无限时长定时器
nextReadyAt := never
// 如果 minheap 小顶堆不为空, 设置最近元素的时间为定时器的时间.
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
// 从堆顶获取最近的元素
entry := waitingForQueue.Peek().(*waitFor)
// 实例化 timer 定时器
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
// 触发 10s 心跳超时后, 重新进行选择最近的定时任务.
case <-nextReadyAt:
// 上次计算的最近元素的定时器已到期, 进行下次循环. 期间会处理该到期任务.
case waitEntry := <-q.waitingForAddCh:
// 收到新添加的定时器
// 如果新对象还未到期, 则把定时对象放到 heap 定时堆里.
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
// 如果该定时任务已到期, 则调用继承的 queue 的 add 方法.把元素添加到队列中.
q.Add(waitEntry.data)
}
// drain 为取尽的设计, 是一个性能优化点.
// 尽量在该单次循环中把 chan 读空, 避免留存后 select 阶段总是被唤醒.
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
// 调用方使用 AddAfter 添加定时任务
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// 如果关闭, 则退出
if q.ShuttingDown() {
return
}
// 进行统计
q.metrics.retry()
// 时间不合理, 直接入队列, 不走堆逻辑
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// 等待退出
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
// 创建一个定时对象, 然后推到 waitingForAddCh 管道中, 等待 waitingLoop 协程处理.
}
}
限速队列
type RateLimitingInterface interface {
// 继承了 DelayingInterface 延迟队列
DelayingInterface
// 使用对应的限频算法求出需要 delay 的时长, 然后添加到 delay 队列中.
AddRateLimited(item interface{})
// 在 rateLimiter 中取消某对象的追踪记录.
Forget(item interface{})
// 从 rateLimiter 中获取计数.
NumRequeues(item interface{}) int
}
type rateLimitingType struct {
// 继承延迟队列
DelayingInterface
// 限速组件
rateLimiter RateLimiter
}
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
// 获取该对象的计数信息.
func (q *rateLimitingType) NumRequeues(item interface{}) int {
return q.rateLimiter.NumRequeues(item)
}
// 删除该对象的记录的信息
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
RateLimiter 的具体的实现
type RateLimiter interface {
// 获取该元素需要等待多久才能入队.
When(item interface{}) time.Duration
// 删除该元素的追踪记录, 有些 rateLimiter 记录了该对象的次数.
Forget(item interface{})
// 该对象记录的次数
NumRequeues(item interface{}) int
}
抽象限速器的实现,有 BucketRateLimiter , ItemBucketRateLimiter , ItemExponentialFailureRateLimiter , ItemFastSlowRateLimiter, MaxOfRateLimiter 混合模式
// https://github.com/openebs/lvm-localpv/blob/45ebdf6dd387307652c2c9cacbbe4aa4ede87030/pkg/mgmt/lvmnode/builder.go
func newNodeController(kubeClient kubernetes.Interface, client dynamic.Interface,
dynInformer dynamicinformer.DynamicSharedInformerFactory, ownerRef metav1.OwnerReference,
pollInterval int) (*NodeController, error) {
// ...
nodeContrller := &NodeController{
// ...
workqueue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(),
workqueue.RateLimitingQueueConfig{
Name: "Node",
}),
// ...
}
// ...
return nodeContrller, nil
}
同时使用排队指数和令牌桶算法
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter( // 混合模式
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
令牌桶算法 BucketRateLimiter: 基于 golang.org/x/time@v0.3.0/rate 实现
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay()
}
计数器算法 ItemFastSlowRateLimiter: 限速器先快速重试一定次数,然后慢速重试
// https://github.com/openebs/lvm-localpv/blob/6ee366ce5f49514f0d16697e438a9da17aa346a6/pkg/mgmt/volume/builder.go
func newVolController(kubeClient kubernetes.Interface, client dynamic.Interface,
dynInformer dynamicinformer.DynamicSharedInformerFactory) (*VolController, error) {
//This ratelimiter requeues failed items after 5 secs for first 12 attempts. Then objects are requeued after 30 secs.
rateLimiter := workqueue.NewItemFastSlowRateLimiter(5*time.Second, 30*time.Second, 12)
// ...
}
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
return &ItemFastSlowRateLimiter{
failures: map[interface{}]int{},
fastDelay: fastDelay, // 快的速度
slowDelay: slowDelay, // 慢的速度
maxFastAttempts: maxFastAttempts, // 最大尝试次数
}
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1
// 当错误次数没超过快速的阈值使用快速,否则使用慢速
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
排队指数算法 ItemExponentialFailureRateLimiter
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
return &ItemExponentialFailureRateLimiter{
failures: map[interface{}]int{},
baseDelay: baseDelay, // 最初限速单位
maxDelay: maxDelay, // 最大限速单位
}
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}