k8s源码阅读-informer的workqueue
背景
实现自定义controller时,会创建workqueue队列来处理eventhandler添加进来的对象,试着通过阅读源码的方式来看看这个队列和普通队列有什么不同,是怎么进行限速的。通过Queue->DelayingQueue->RateLimiterQueue递进的顺序来进行分析
1.Queue的实现
Queue的接口定义:
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShutDownWithDrain()
ShuttingDown() bool
}
实际使用队列时,用到的是Add、Get、Done方法,这里对这几个方法进行分析
Queue的结构体实现:
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
// ......
}
- queue: queue定义了这个队列中的元素处理顺序,是一个[]interface{}类型的切片,可以保存任意类型的元素
- dirty: set的底层类型时map[interface{}]struct{},queue中的元素集合会存到这个dirty set中,指待处理的items
- processing: 也是set类型,保存的是正在处理的items
- cond: 同步原语,用于在队列中有新元素时,通知调用Get进入wait状态的协程
整体的实现逻辑:当往队列中添加元素时,会添加到queue和dirty中,取出的正在处理的元素在processing中,元素处理完毕之后,会从processing中删除。为了确保队列中的元素不重复(避免多个协程同时处理同一个资源对象),所以引入了dirty,这样判断一个元素是不是已经存在的时间复杂度就是O(1),而如果一个元素正在被处理,而同时被添加进来会只放入到dirty中,在处理完毕之后,才会从dirty中添加到queue中,也是为了避免并发处理同一个资源对象。
1.1 Queue的Add()方法
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.metrics.add(item)
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
如果dirty中有这个元素就返回,否则就插入dirty中,而后就判断当前元素是否正在被处理,如果正在被处理就不放入queue中,避免被多个协程获取到并发处理。
1.2 Queue的Get()方法
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item)
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
Get()
会尝试从queue中获取一个元素,同时将其加入到processing中,从dirty中删除,这里可以看到,如果之前queue中为空,导致获取元素的协程Wait的话,会被唤醒处理元素。
1.3 Queue.Done()方法
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
} else if q.processing.len() == 0 {
q.cond.Signal()
}
}
Done() 方法用来标记一个 item 被处理完成了。调用 Done() 方法的时候,这个 item 被从 processing set 中删除。另外前面提到 Add 的过程中如果发现元素在 processing
中存在,那么这个元素会被暂存到 dirty
中。这里在处理元素的 Done 逻辑的时候也就顺带把暂存到 dirty
中的元素取出来,加入到 queue 里去了。
2 DelayingQueue的实现
DelayingQueue的接口
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
这里的Interface{}就是前面的Queue的接口,在前面的基础上添加了一个AddAfter()方法,可以实现在一个item处理失败之后,在指定的延时之后再重新入队。
2.1 DelayingQueue.AddAfter()方法
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// don't add if we're already shutting down
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
duration<=0时直接调用Add加入队列,当大于0时,会创建一个WaitFor对象放入到waitingForAddCh中,这个channel的消费可以查看DelayingType的创建函数:
func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
ret := &delayingType{
Interface: q,
clock: clock,
heartbeat: clock.NewTicker(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name, provider),
}
go ret.waitingLoop()
return ret
}
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}
启动了一个协程去运行ret.waitingLoop(),创建的waitingForAddCh的容量大小是1000,内容就是waitFor
waitingLoop()方法:
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)
// Make a timer that expires when the item at the head of the waiting queue is ready
var nextReadyAtTimer clock.Timer
// 这是一个优先级队列,用一个最小堆保存了所有 waitFor(waitForPriorityQueue 的类型是 []*waitFor)
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
// 后面的所有逻辑都在这个 for 循环里
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
// Add ready entries
// 一直判断 waitingForQueue 这个堆里有没有元素,如果有,就 Peek 出来第一个元素看是不是 ready,如果 ready
// 那就通过 q.Add() 方法将其数据放入队列(也就是前文那个 Queue 实现的 Add 逻辑)
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
// 代码到这里也就是说前面一个 for 里的 break 被执行了,换言之最小堆中的最小元素,也就是最快 ready 的一个都还没有 ready;
// 这里将 nextReadyAtTimer 计时器的时间设置为(最快 ready 的第一个元素的 ready 时间 - 当前时间),
// 这样当 nextReadyAt 这个 channel 有数据的时候,最小堆里的第一个元素也就 ready 了。
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
// 刚才设置好了一个合适的 nextReadyAt,现在开始 select 等待某个 channel 有反应
select {
case <-q.stopCh:
return
// 心跳时间是10s
case <-q.heartbeat.C():
// continue the loop, which will add ready items
// 执行到这里也就是第一个 item ready 了
case <-nextReadyAt:
// continue the loop, which will add ready items
// 当有元素被通过 AddAfter() 方法加进来时,waitingForAddCh 就会有内容,这时候会被取出来;
// 如果没有 ready,那就调用 insert(大概率主要就是插入最小堆的过程);如果 ready 了那就直接 Add 到 Queue 里。
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
// 通过一个循环将 waitingForAddCh 中的所有元素都消费掉,根据 ready 情况要么插入最小堆(优先级队列),要么直接入队。
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
insert()方法会判断在waitingForQueue这个优先级队列中是否存在对应元素,不存在就加入,存在就更新readyAt时间。
3 RateLimitingQueue
接口:
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
DelayingInterface
// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
AddRateLimited(item interface{})
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
// still have to call `Done` on the queue.
Forget(item interface{})
// NumRequeues returns back how many times the item was requeued
NumRequeues(item interface{}) int
}
这里关注AddRateLimited()方法和Forget()方法
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType) AddRateLimited(item interface{}) {
q.DelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType) Forget(item interface{}) {
q.rateLimiter.Forget(item)
}
通过ratelimiter确定item的ready时间,通过ratelimiter来Forget item
3.1 rateLimiter限速器的实现
接口:
type RateLimiter interface {
// When gets an item and gets to decide how long that item should wait
When(item interface{}) time.Duration
// Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing
// or for success, we'll stop tracking it
Forget(item interface{})
// NumRequeues returns back how many failures the item has had
NumRequeues(item interface{}) int
}
限速队列的实现:
// 1 BucketRateLimiter
// 用了 golang 标准库的 golang.org/x/time/rate.Limiter 实现。BucketRateLimiter 实例化的时候比如传递一个 rate.NewLimiter(rate.Limit(10), 100) 进去,表示令牌桶里最多有 100 个令牌,每秒发放 10 个令牌。
type BucketRateLimiter struct {
*rate.Limiter
}
var _ RateLimiter = &BucketRateLimiter{}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
return r.Limiter.Reserve().Delay() // 过多久后给当前 item 发放一个令牌
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
}
// 2 ItemExponentialFailureRateLimiter
// Exponential 是指数的意思,从这个限速器的名字大概能猜到是失败次数越多,限速越长而且是指数级增长的一种限速器。
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
baseDelay time.Duration
maxDelay time.Duration
}
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1 // 失败次数加一
// 每调用一次,exp 也就加了1,对应到这里时 2^n 指数爆炸
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
if backoff > math.MaxInt64 { // 如果超过了最大整型,就返回最大延时,不然后面时间转换溢出了
return r.maxDelay
}
calculated := time.Duration(backoff)
if calculated > r.maxDelay { // 如果超过最大延时,则返回最大延时
return r.maxDelay
}
return calculated
}
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
// 3 ItemFastSlowRateLimiter
// 快慢限速器,也就是先快后慢,定义一个阈值,超过了就慢慢重试。
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
failures map[interface{}]int
maxFastAttempts int // 快速重试的次数
fastDelay time.Duration // 快重试间隔
slowDelay time.Duration // 慢重试间隔
}
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
r.failures[item] = r.failures[item] + 1 // 标识重试次数 + 1
if r.failures[item] <= r.maxFastAttempts { // 如果快重试次数没有用完,则返回 fastDelay
return r.fastDelay
}
return r.slowDelay // 反之返回 slowDelay
}
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
return r.failures[item]
}
func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
// 4 MaxOfRateLimiter
// 这个限速器内部放多个限速器,然后返回限速最长的一个延时
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
// 5 WithMaxWaitRateLimiter
// 在其他限速器上包装一个最大延迟的属性,如果到了最大延时,则直接返回
type WithMaxWaitRateLimiter struct {
limiter RateLimiter // 其他限速器
maxDelay time.Duration // 最大延时
}
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
}
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
delay := w.limiter.When(item)
if delay > w.maxDelay {
return w.maxDelay // 已经超过了最大延时,直接返回最大延时
}
return delay
}
自定义控制器的默认限速队列:
func DefaultControllerRateLimiter() RateLimiter {
return NewMaxOfRateLimiter(
NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
return &MaxOfRateLimiter{limiters: limiters}
}
// 这里就是前面实现的令牌桶和指数延迟限速队列,会根据两个限速器返回的最大值作为限速结果。
总结
在编写自定义控制器的时候我们会用到这样的代码:
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
这里实例化的队列是一个限速队列。
在 ResourceEventHandlerFuncs
里我们会写 queue.Add(key)
这样的代码,将一个 key(item/obj)加入到 workqueue 里。
控制器里还会写到这样的代码:
obj, shutdown := c.workqueue.Get()
c.workqueue.Done(obj)
c.workqueue.Forget(obj)
c.workqueue.AddRateLimited(obj)
这些方法也就对应着一个 obj 从 workqueue 中取出来(Get()
),如果处理完成,就调用 Done(obj)
,从而从队列中彻底移除,同时调用 Forget(obj)
告诉记速器可以忽略这个 obj 了(可别下次同名 obj 来的时候,误判了,让人家白等半天)。最后如果处理失败,遇到点啥临时异常情况,那就放回到 workqueue 里去,用 AddRateLimited(obj)
方法。