kube-apiserver APF(API Priority and Fairness 优先级和公平性)

Nov 18, 2024 · 14 min read

对于集群管理员来说,控制 Kubernetes API 服务器在过载情况下的行为是一项关键任务。 kube-apiserver 有一些控件(例如:命令行标志 –max-requests-inflight 和 –max-mutating-requests-inflight), 可以限制将要接受的未处理的请求,从而防止过量请求入站,潜在导致 API 服务器崩溃。 但是这些标志不足以保证在高流量期间,最重要的请求仍能被服务器接受。

API 优先级和公平性(APF)是一种替代方案,可提升上述最大并发限制。 APF 以更细粒度的方式对请求进行分类和隔离。 它还引入了空间有限的排队机制,因此在非常短暂的突发情况下,API 服务器不会拒绝任何请求。 通过使用公平排队技术从队列中分发请求,这样, 一个行为不佳的控制器就不会饿死其他控制器 (即使优先级相同)

以下代码证基于版本 release-1.27

传统限流方法的缺点

比如突然有一个人发起无数请求,这些请求一个人就可以将apiserver打死,然后它阻塞了其他的所有的请求。 因为是一个共享集群,这个共享集群里面有无数的用户,然后无数的组件,如果有一个组件出现了问题,比如他发了1w个请求到apiserver,这些请求就将apiserver堵死了,请求请求只能在后面排队

开启配置

// staging/src/k8s.io/apiserver/pkg/server/options/recommended.go
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
    // ..
	// APIPriorityAndFairness判断是否开启
	if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) {
		if config.ClientConfig != nil {
			if config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight <= 0 {
				return fmt.Errorf("invalid configuration: MaxRequestsInFlight=%d and MaxMutatingRequestsInFlight=%d; they must add up to something positive", config.MaxRequestsInFlight, config.MaxMutatingRequestsInFlight)

			}
			config.FlowControl = utilflowcontrol.New(
				config.SharedInformerFactory,
				kubernetes.NewForConfigOrDie(config.ClientConfig).FlowcontrolV1beta3(), // 1.27 是V1beta3版本,1.29 会是stable v1 
				config.MaxRequestsInFlight+config.MaxMutatingRequestsInFlight, // 总并发数为 --max-requests-inflight 和 --max-mutating-requests-inflight 两个配置值之和
				config.RequestTimeout/4,
			)
		} else {
			klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")
		}
	}
	return nil
}
// staging/src/k8s.io/apiserver/pkg/server/config.go
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := filterlatency.TrackCompleted(apiHandler)
    // ..

	if c.FlowControl != nil { 
		workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
		requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
			c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
		handler = filterlatency.TrackCompleted(handler)
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
		handler = filterlatency.TrackStarted(handler, c.TracerProvider, "priorityandfairness")
	} else {
		// 旧版本: 基于并发连接数的限流
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}
}

混洗分片(Shuffle-Sharding)

shuffle sharding用到了虚拟分片(shuffle shard)的概念,这里将不会直接对workers进行分片,而是按照"用户"进行分片,目的是尽量将用户打散分布到不同的worker上

API Priority and Fairness

APF 的核心:

  • 多等级:它将整个集群分为了不同的限流等级FlowSchema,会把相近用户的请求分到不同等级里面,比如和系统相关,那么优先级可能比较高,普通用户的优先级可能比较低。

  • 多队列:对于同一个 FlowSchema,会有多个队列,每个队列单独限流

(|kind-kind:N/A)➜  ~ kubectl api-resources| head -1;kubectl api-resources |grep flowcontrol.apiserver.k8s.io
NAME                              SHORTNAMES   APIVERSION                             NAMESPACED   KIND
flowschemas                                    flowcontrol.apiserver.k8s.io/v1beta3   false        FlowSchema
prioritylevelconfigurations                    flowcontrol.apiserver.k8s.io/v1beta3   false        PriorityLevelConfiguration

APF限流通过两种资源

// k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap/default.go

// The objects that define the current suggested additional configuration
var (
	SuggestedPriorityLevelConfigurations = []*flowcontrol.PriorityLevelConfiguration{
		// "system" priority-level is for the system components that affects self-maintenance of the
		// cluster and the availability of those running pods in the cluster, including kubelet and
		// kube-proxy.
		SuggestedPriorityLevelConfigurationSystem,
		// "node-high" priority-level is for the node health reporting. It is separated from "system"
		// to make sure that nodes are able to report their health even if kube-apiserver is not capable of
		// handling load caused by pod startup (fetching secrets, events etc).
		// NOTE: In large clusters 50% - 90% of all API calls use this priority-level.
		SuggestedPriorityLevelConfigurationNodeHigh,
		// "leader-election" is dedicated for controllers' leader-election, which majorly affects the
		// availability of any controller runs in the cluster.
		SuggestedPriorityLevelConfigurationLeaderElection,
		// "workload-high" is used by those workloads with higher priority but their failure won't directly
		// impact the existing running pods in the cluster, which includes kube-scheduler, and those well-known
		// built-in workloads such as "deployments", "replicasets" and other low-level custom workload which
		// is important for the cluster.
		SuggestedPriorityLevelConfigurationWorkloadHigh,
		// "workload-low" is used by those workloads with lower priority which availability only has a
		// minor impact on the cluster.
		SuggestedPriorityLevelConfigurationWorkloadLow,
		// "global-default" serves the rest traffic not handled by the other suggested flow-schemas above.
		SuggestedPriorityLevelConfigurationGlobalDefault,
	}
	SuggestedFlowSchemas = []*flowcontrol.FlowSchema{
		SuggestedFlowSchemaSystemNodes,               // references "system" priority-level
		SuggestedFlowSchemaSystemNodeHigh,            // references "node-high" priority-level
		SuggestedFlowSchemaProbes,                    // (豁免)
		SuggestedFlowSchemaSystemLeaderElection,      // references "leader-election" priority-level
		SuggestedFlowSchemaWorkloadLeaderElection,    // references "leader-election" priority-level
		SuggestedFlowSchemaEndpointsController,       // references "workload-high" priority-level
		SuggestedFlowSchemaKubeControllerManager,     // references "workload-high" priority-level
		SuggestedFlowSchemaKubeScheduler,             // references "workload-high" priority-level
		SuggestedFlowSchemaKubeSystemServiceAccounts, // references "workload-high" priority-level
		SuggestedFlowSchemaServiceAccounts,           // references "workload-low" priority-level
		SuggestedFlowSchemaGlobalDefault,             // references "global-default" priority-level
	}
)
  • PriorityLevelConfigurations 定义隔离类型和可处理的并发预算量,还可以调整排队行为。
(|kind-kind:N/A)➜  ~ kg prioritylevelconfigurations
NAME              TYPE      NOMINALCONCURRENCYSHARES   QUEUES   HANDSIZE   QUEUELENGTHLIMIT   AGE
catch-all         Limited   5                          <none>   <none>     <none>             37h
exempt            Exempt    <none>                     <none>   <none>     <none>             37h
global-default    Limited   20                         128      6          50                 37h
leader-election   Limited   10                         16       4          50                 37h
node-high         Limited   40                         64       6          50                 37h
system            Limited   30                         64       6          50                 37h
workload-high     Limited   40                         128      6          50                 37h
workload-low      Limited   100                        128      6          50                 37h
(|kind-kind:N/A)➜  ~ kg prioritylevelconfigurations global-default -o yaml
apiVersion: flowcontrol.apiserver.k8s.io/v1beta3
kind: PriorityLevelConfiguration
metadata:
  name: global-default
spec:
  limited: #限制策略
    lendablePercent: 50
    limitResponse:
      queuing:
        handSize: 6 #队列
        queueLengthLimit: 50 #队列长度
        queues: 128 #队列数
      type: Queue #Queue或者Reject,Reject直接返回429,Queue将请求加入队列
    nominalConcurrencyShares: 20
  type: Limited #类型,Limited或Exempt, Exempt即不限制
  • FlowSchemas 用于对每个入站请求进行分类,并与一个 PriorityLevelConfigurations相匹配
(|kind-kind:N/A)➜  ~ kubectl get flowschemas
NAME                           PRIORITYLEVEL     MATCHINGPRECEDENCE   DISTINGUISHERMETHOD   AGE   MISSINGPL
exempt                         exempt            1                    <none>                37h   False
probes                         exempt            2                    <none>                37h   False
system-leader-election         leader-election   100                  ByUser                37h   False
endpoint-controller            workload-high     150                  ByUser                37h   False
workload-leader-election       leader-election   200                  ByUser                37h   False
system-node-high               node-high         400                  ByUser                37h   False
system-nodes                   system            500                  ByUser                37h   False
kube-controller-manager        workload-high     800                  ByNamespace           37h   False
kube-scheduler                 workload-high     800                  ByNamespace           37h   False
kube-system-service-accounts   workload-high     900                  ByNamespace           37h   False
service-accounts               workload-low      9000                 ByUser                37h   False
global-default                 global-default    9900                 ByUser                37h   False
catch-all                      catch-all         10000                ByUser                37h   False

(|kind-danny-test:N/A)➜  ~ kubectl get flowschema global-default -o yaml
apiVersion: flowcontrol.apiserver.k8s.io/v1beta3
kind: FlowSchema
metadata:
  generation: 1
  name: global-default
spec:
  distinguisherMethod:
    type: ByUser
  matchingPrecedence: 9900 #匹配优先级,1~1000,越小优先级越高
  priorityLevelConfiguration:
    name: global-default
  rules:
  - nonResourceRules:
    - nonResourceURLs:
      - '*'
      verbs:
      - '*'
    resourceRules:
    - apiGroups:
      - '*'
      clusterScope: true
      namespaces:
      - '*'
      resources:
      - '*'
      verbs:
      - '*'
    subjects:
    - group:
        name: system:unauthenticated
      kind: Group
    - group:
        name: system:authenticated
      kind: Group

每个flowschemas都有其对应的优先级,所以任何请求过来之后它都会从上到下去匹配,优先级数字越小的越优先匹配(第三列),它就通过优先级来决定它的限流策略是什么

prioritylevelconfigurations 配置使用

  • 增大 plc 的 queues 参数值,会减少不同 flow 之间冲突的可能性,但是会增加内存负担,如果其值为 1, 则会禁掉 fair-queueing 逻辑,但是请求还是会被排队处理;
  • 增大 plc 的 queueLengthLimit 的参数值,可以应对突发的流量,不丢弃相关的请求,但会增大延迟和内存占用;
  • 增大 plc 的 handsize 的参数值,可调节不同flow冲突的概率【增加公平度,防止某些 flow 饥饿】,以及总体并发度;但也可能导致某些类型的 flow 霸占住 as,且导致请求处理延迟增大;单 个 flow 上能处理的最大请求的数目可能的值为 handSize * queueLengthLimit

配置初始化

func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedGaugePair, execSeatsObs metrics.RatioedGauge, seatDemandGauge metrics.Gauge) (fq.QueueSetCompleter, error) {
    // ...
	qcAPI := pl.Spec.Limited.LimitResponse.Queuing
	qcQS := fq.QueuingConfig{Name: pl.Name}
	if qcAPI != nil {
		qcQS = fq.QueuingConfig{Name: pl.Name,
			DesiredNumQueues: int(qcAPI.Queues),
			QueueLengthLimit: int(qcAPI.QueueLengthLimit),
			HandSize:         int(qcAPI.HandSize),
			RequestWaitLimit: requestWaitLimit,
		}
	}
	var qsc fq.QueueSetCompleter
	var err error
	if queues != nil {
		qsc, err = queues.BeginConfigChange(qcQS)
	} else {
		qsc, err = qsf.BeginConstruction(qcQS, reqsIntPair, execSeatsObs, seatDemandGauge)
	}
    //.. 
	return qsc, err
}

配置创建 dealer

func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsGaugePair metrics.RatioedGaugePair, execSeatsGauge metrics.RatioedGauge, seatDemandIntegrator metrics.Gauge) (fq.QueueSetCompleter, error) {
	// 初始化一个实例
	dealer, err := checkConfig(qCfg)
    //...
	return &queueSetCompleter{
		factory:              qsf,
		reqsGaugePair:        reqsGaugePair,
		execSeatsGauge:       execSeatsGauge,
		seatDemandIntegrator: seatDemandIntegrator,
		qCfg:                 qCfg,
		dealer:               dealer}, nil
}


func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {
    // ...
	// deckSize为队列数,handSize表示为一条流分配的队列数量
	dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize)
	if err != nil {
		err = fmt.Errorf("the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize): %w", err)
	}
	return dealer, err
}
// 返回为流选择的队列ID
func (d *Dealer) DealIntoHand(hashValue uint64, hand []int) []int {
	h := hand[:0]
	d.Deal(hashValue, func(card int) { h = append(h, card) })
	return h
}

func (d *Dealer) Deal(hashValue uint64, pick func(int)) {
	// 15 is the largest possible value of handSize
	var remainders [15]int

	// 这个for循环用于生成[0,deckSize)范围内的随机数。
	for i := 0; i < d.handSize; i++ {
		hashValueNext := hashValue / uint64(d.deckSize-i)
		remainders[i] = int(hashValue - uint64(d.deckSize-i)*hashValueNext)
		hashValue = hashValueNext
	}

	for i := 0; i < d.handSize; i++ {
		card := remainders[i]
		for j := i; j > 0; j-- {
			if card >= remainders[j-1] {
				card++
			}
		}
		pick(card)
	}
}

FlowSchemas 配置使用

  • matchingPrecedence:定义 FlowSchema 的应用顺序,数字越低,优先级越高。
  • rules:定义请求过滤规则,格式与 Kubernetes RBAC 中的格式相同。
  • distinguisherMethod:指定一个参数(用户或命名空间),用于在将请求转发到优先级时将请求分离到流中,如果省略该参数,所有请求将分配给同一流(flow)。

查看效果

TOKEN=$(kubectl -n d8-cni-cilium get secrets agent-token-45s7n -o json | jq -r .data.token | base64 -d)

curl https://127.0.0.1:6445/apis/cilium.io/v2/ciliumclusterwidenetworkpolicies?limit=500  -X GET --header "Authorization: Bearer $TOKEN" -k -I
HTTP/2 200
audit-id: 4f647505-8581-4a99-8e4c-f3f4322f79fe
cache-control: no-cache, private
content-type: application/json
x-kubernetes-pf-flowschema-uid: 7f0afa35-07c3-4601-b92c-dfe7e74780f8
x-kubernetes-pf-prioritylevel-uid: df8f409a-ebe7-4d54-9f21-1f2a6bee2e81
content-length: 173
date: Sun, 26 Mar 2023 17:45:02 GMT

kubectl get flowschemas -o custom-columns="uid:{metadata.uid},name:{metadata.name}" | grep 7f0afa35-07c3-4601-b92c-dfe7e74780f8
7f0afa35-07c3-4601-b92c-dfe7e74780f8   d8-serviceaccounts

kubectl get prioritylevelconfiguration -o custom-columns="uid:{metadata.uid},name:{metadata.name}" | grep df8f409a-ebe7-4d54-9f21-1f2a6bee2e81
df8f409a-ebe7-4d54-9f21-1f2a6bee2e81   d8-serviceaccounts

在响应时,APIServer 会提供特殊的 Header X-Kubernetes-PF-FlowSchema-UID 和X-Kubernetes-PF-PriorityLevel-UID,你可以使用它们来查看请求的去向。

输出显示该请求属于 d8-serviceaccounts 的 FlowSchema 和 d8-serviceaccounts 的 PriorityLevelConfiguration

处理流程

func WithPriorityAndFairness(
	handler http.Handler,
	longRunningRequestCheck apirequest.LongRunningRequestCheck,
	fcIfc utilflowcontrol.Interface,
	workEstimator flowcontrolrequest.WorkEstimatorFunc,
) http.Handler {
    // ...
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		ctx := r.Context()
		requestInfo, ok := apirequest.RequestInfoFrom(ctx)
		if !ok {
			handleError(w, r, fmt.Errorf("no RequestInfo found in context"))
			return
		}
		user, ok := apirequest.UserFrom(ctx)
		if !ok {
			handleError(w, r, fmt.Errorf("no User found in context"))
			return
		}

		isWatchRequest := watchVerbs.Has(requestInfo.Verb)
		
		if longRunningRequestCheck != nil && longRunningRequestCheck(r, requestInfo) && !isWatchRequest {
			klog.V(6).Infof("Serving RequestInfo=%#+v, user.Info=%#+v as longrunning\n", requestInfo, user)
			handler.ServeHTTP(w, r)
			return
		}

		var classification *PriorityAndFairnessClassification
        // ...

		var served bool
		isMutatingRequest := !nonMutatingRequestVerbs.Has(requestInfo.Verb)
		noteExecutingDelta := func(delta int32) {
			if isMutatingRequest {
				watermark.recordMutating(int(atomic.AddInt32(&atomicMutatingExecuting, delta)))
			} else {
				watermark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyExecuting, delta)))
			}
		}
		noteWaitingDelta := func(delta int32) {
			if isMutatingRequest {
				waitingMark.recordMutating(int(atomic.AddInt32(&atomicMutatingWaiting, delta)))
			} else {
				waitingMark.recordReadOnly(int(atomic.AddInt32(&atomicReadOnlyWaiting, delta)))
			}
		}
		queueNote := func(inQueue bool) {
			if inQueue {
				noteWaitingDelta(1)
			} else {
				noteWaitingDelta(-1)
			}
		}

		digest := utilflowcontrol.RequestDigest{
			RequestInfo: requestInfo,
			User:        user,
		}

		if isWatchRequest { // watch 请求处理
            // ...
		} else {
			execute := func() {
				noteExecutingDelta(1)
				defer noteExecutingDelta(-1)
				served = true
				setResponseHeaders(classification, w)

				handler.ServeHTTP(w, r)
			}

			fcIfc.Handle(ctx, digest, noteFn, estimateWork, queueNote, execute)
		}

        // ...
	})
}
  • Long-running 运行的 API 请求(例如,在 pod 中查看日志或执行命令)不受 APF 限制,WATCH 请求也不受限制。
  • 还有一个特殊的预定义优先级称为 exempt,该级别的请求会立即得到处理

具体的 handle

func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest RequestDigest,
	noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
	workEstimator func() fcrequest.WorkEstimate,
	queueNoteFn fq.QueueNoteFn,
	execFn func()) {
	// 对请求进行分类
	fs, pl, isExempt, req, startWaitingTime := cfgCtlr.startRequest(ctx, requestDigest, noteFn, workEstimator, queueNoteFn)
    // ...
	// 执行
	idle = req.Finish(func() {
        // ...
		executed = true
		// 请求执行
		execFn()
	})
    /// ...
}

开始请求

// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go
func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest,
	noteFn func(fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, flowDistinguisher string),
	workEstimator func() fcrequest.WorkEstimate,
	queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
	klog.V(7).Infof("startRequest(%#+v)", rd)
	cfgCtlr.lock.RLock()
	defer cfgCtlr.lock.RUnlock()
	var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
	// 可以根据请求的主体 (User, Group, ServiceAccount)、动作 (Get, List, Create, Delete …)、资源类型 (pod, deployment …)、namespace、url 对请求进行分类
	for _, fs := range cfgCtlr.flowSchemas {
        // 匹配
		if matchesFlowSchema(rd, fs) {
			selectedFlowSchema = fs
			break
		}
		if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
			catchAllFlowSchema = fs
		}
	}
    // ...
	plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
	plState := cfgCtlr.priorityLevelStates[plName]
	if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { // 豁免的情况
		noteFn(selectedFlowSchema, plState.pl, "")
		klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
		return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
	}
	var numQueues int32
	if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
		numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
	}
	var flowDistinguisher string
	var hashValue uint64
	if numQueues > 1 {
        //根据 DistinguisherMethod 判断获取 userName 或 namespace 
		flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
        // APF 利用 FS 的 name 和 计算一个 hashFlowID 标识 Flow
		hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) 
	}

	noteFn(selectedFlowSchema, plState.pl, flowDistinguisher)
	workEstimate := workEstimator()

	startWaitingTime = cfgCtlr.clock.Now()
	
	// 使用混洗分片 shuffle-shards 处理请求
	req, idle := plState.queues.StartRequest(ctx, &workEstimate, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
	if idle {
		cfgCtlr.maybeReapReadLocked(plName, plState)
	}
	return selectedFlowSchema, plState.pl, false, req, startWaitingTime
}

匹配规则


func matchesPolicyRule(digest RequestDigest, policyRule *flowcontrol.PolicyRulesWithSubjects) bool {
/*
	1. 匹配请求主体 subject
	2. 对资源的请求,匹配 ResourceRules 中任意一条规则
	3. 对非资源的请求, 匹配 NonResourceRules 中任意一条规则
*/
	if !matchesASubject(digest.User, policyRule.Subjects) {
		return false
	}
	if digest.RequestInfo.IsResourceRequest {
		return matchesAResourceRule(digest.RequestInfo, policyRule.ResourceRules)
	}
	return matchesANonResourceRule(digest.RequestInfo, policyRule.NonResourceRules)
}

处理请求

// staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go
func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
	qs.lockAndSyncTime(ctx)
	defer qs.lock.Unlock()
	var req *request

	// ========================================================================
	// 步骤 0:
	// Apply only concurrency limit, if zero queues desired
	if qs.qCfg.DesiredNumQueues < 1 {
		if !qs.canAccommodateSeatsLocked(workEstimate.MaxSeats()) {
			klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
				qs.qCfg.Name, fsName, descr1, descr2, workEstimate, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
			qs.totRequestsRejected++
			metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
			return nil, qs.isIdleLocked()
		}
		req = qs.dispatchSansQueueLocked(ctx, workEstimate, flowDistinguisher, fsName, descr1, descr2)
		return req, false
	}

	// ========================================================================
	// 步骤 1:
	// 1) Start with shuffle sharding, to pick a queue.
	// 2) Reject old requests that have been waiting too long
	// 3) Reject current request if there is not enough concurrency shares and
	// we are at max queue length
	// 4) If not rejected, create a request and enqueue
	req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, workEstimate, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
	// req == nil means that the request was rejected - no remaining
	// concurrency shares and at max queue length already
	if req == nil {
		klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
        // ..
		return nil, qs.isIdleLocked()
	}

	// ========================================================================
	// 步骤 2:
	// The next step is to invoke the method that dequeues as much
	// as possible.
	// This method runs a loop, as long as there are non-empty
	// queues and the number currently executing is less than the
	// assured concurrency value.  The body of the loop uses the
	// fair queuing technique to pick a queue and dispatch a
	// request from that queue.
	qs.dispatchAsMuchAsPossibleLocked()

	return req, false
}


func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
	// 循环出队
	for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit && qs.dispatchLocked() {
	}
}


func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, workEstimate *fqrequest.WorkEstimate, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
	// 开始 shuffle sharding 选择队列 
	queueIdx := qs.shuffleShardLocked(hashValue, descr1, descr2)
	queue := qs.queues[queueIdx]
	// 针对入队时间超过RequestWaitLimit,设置决定为拒绝 
	qs.removeTimedOutRequestsFromQueueToBoundLocked(queue, fsName)

	defer qs.boundNextDispatchLocked(queue)

	// Create a request and enqueue
	req := &request{
		qs:                qs,
		fsName:            fsName,
		flowDistinguisher: flowDistinguisher,
		ctx:               ctx,
		decision:          qs.promiseFactory(nil, ctx.Done(), decisionCancel), // 决定
		arrivalTime:       qs.clock.Now(),
		arrivalR:          qs.currentR,
		queue:             queue,
		descr1:            descr1,
		descr2:            descr2,
		queueNoteFn:       queueNoteFn,
		workEstimate:      qs.completeWorkEstimate(workEstimate),
	}
	// 达到上限进行拒绝
	if ok := qs.rejectOrEnqueueToBoundLocked(req); !ok {
		return nil
	}
    // ...
	return req
}


func (qs *queueSet) shuffleShardLocked(hashValue uint64, descr1, descr2 interface{}) int {
	var backHand [8]int
	// 获取本条流的队列列表
	hand := qs.dealer.DealIntoHand(hashValue, backHand[:])
	handSize := len(hand)
	// qs.enqueues表示队列中的请求总数,这里第一次哈希取模算出队列的起始偏移量
	offset := qs.enqueues % handSize
	qs.enqueues++
	bestQueueIdx := -1
	minQueueSeatSeconds := fqrequest.MaxSeatSeconds
	for i := 0; i < handSize; i++ {
		queueIdx := hand[(offset+i)%handSize]
		queue := qs.queues[queueIdx]
		queueSum := queue.requests.QueueSum()

		// this is the total amount of work in seat-seconds for requests
		// waiting in this queue, we will select the queue with the minimum.
		thisQueueSeatSeconds := queueSum.TotalWorkSum
		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d with sum: %#v and %d seats in use, nextDispatchR=%v", qs.qCfg.Name, descr1, descr2, queueIdx, queueSum, queue.seatsInUse, queue.nextDispatchR)
		if thisQueueSeatSeconds < minQueueSeatSeconds {
			minQueueSeatSeconds = thisQueueSeatSeconds
			bestQueueIdx = queueIdx
		}
	}
    // ..
	return bestQueueIdx
}

// 使用 fair queuing算法: 从所有queue中选择一个合适的queue取出请求,解除请求的阻塞,执行这个请求 
func (qs *queueSet) dispatchLocked() bool {
	queue, request := qs.findDispatchQueueToBoundLocked()
	if queue == nil {
		return false
	}
	if request == nil { // This should never happen.  But if it does...
		return false
	}
	qs.totRequestsWaiting--
	qs.totSeatsWaiting -= request.MaxSeats()
	metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
	request.NoteQueued(false)
	qs.reqsGaugePair.RequestsWaiting.Add(-1)
	defer qs.boundNextDispatchLocked(queue)
	if !request.decision.Set(decisionExecute) {
		qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
		return true
	}

    // ... 
	queue.nextDispatchR += request.totalWork()
	return true
}

func (qs *queueSet) findDispatchQueueToBoundLocked() (*queue, *request) {
	minVirtualFinish := fqrequest.MaxSeatSeconds
	sMin := fqrequest.MaxSeatSeconds
	dsMin := fqrequest.MaxSeatSeconds
	sMax := fqrequest.MinSeatSeconds
	dsMax := fqrequest.MinSeatSeconds
	var minQueue *queue
	var minIndex int
	nq := len(qs.queues)
	for range qs.queues {
		qs.robinIndex = (qs.robinIndex + 1) % nq
		queue := qs.queues[qs.robinIndex]
		oldestWaiting, _ := queue.requests.Peek()
		if oldestWaiting != nil {
			sMin = ssMin(sMin, queue.nextDispatchR)
			sMax = ssMax(sMax, queue.nextDispatchR)
			estimatedWorkInProgress := fqrequest.SeatsTimesDuration(float64(queue.seatsInUse), qs.estimatedServiceDuration)
			dsMin = ssMin(dsMin, queue.nextDispatchR-estimatedWorkInProgress)
			dsMax = ssMax(dsMax, queue.nextDispatchR-estimatedWorkInProgress)
			currentVirtualFinish := queue.nextDispatchR + oldestWaiting.totalWork()
			klog.V(11).InfoS("Considering queue to dispatch", "queueSet", qs.qCfg.Name, "queue", qs.robinIndex, "finishR", currentVirtualFinish)
			if currentVirtualFinish < minVirtualFinish {
				minVirtualFinish = currentVirtualFinish
				minQueue = queue
				minIndex = qs.robinIndex
			}
		}
	}

	oldestReqFromMinQueue, _ := minQueue.requests.Peek()
	if oldestReqFromMinQueue == nil {
		// This cannot happen
		klog.ErrorS(errors.New("selected queue is empty"), "Impossible", "queueSet", qs.qCfg.Name)
		return nil, nil
	}
	if !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.MaxSeats()) {
		// since we have not picked the queue with the minimum virtual finish
		// time, we are not going to advance the round robin index here.
		klogV := klog.V(4)
		if klogV.Enabled() {
			klogV.Infof("QS(%s): request %v %v seats %d cannot be dispatched from queue %d, waiting for currently executing requests to complete, %d requests are occupying %d seats and the limit is %d",
				qs.qCfg.Name, oldestReqFromMinQueue.descr1, oldestReqFromMinQueue.descr2, oldestReqFromMinQueue.MaxSeats(), minQueue.index, qs.totRequestsExecuting, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
		}
		metrics.AddDispatchWithNoAccommodation(qs.qCfg.Name, oldestReqFromMinQueue.fsName)
		return nil, nil
	}
	oldestReqFromMinQueue.removeFromQueueLocked()

	// If the requested final seats exceed capacity of that queue,
	// we reduce them to current capacity and adjust additional latency
	// to preserve the total amount of work.
	if oldestReqFromMinQueue.workEstimate.FinalSeats > uint64(qs.dCfg.ConcurrencyLimit) {
		finalSeats := uint64(qs.dCfg.ConcurrencyLimit)
		additionalLatency := oldestReqFromMinQueue.workEstimate.finalWork.DurationPerSeat(float64(finalSeats))
		oldestReqFromMinQueue.workEstimate.FinalSeats = finalSeats
		oldestReqFromMinQueue.workEstimate.AdditionalLatency = additionalLatency
	}

	// we set the round robin indexing to start at the chose queue
	// for the next round.  This way the non-selected queues
	// win in the case that the virtual finish times are the same
	qs.robinIndex = minIndex

	if minQueue.nextDispatchR < oldestReqFromMinQueue.arrivalR {
		klog.ErrorS(errors.New("dispatch before arrival"), "Inconceivable!", "QS", qs.qCfg.Name, "queue", minQueue.index, "dispatchR", minQueue.nextDispatchR, "request", oldestReqFromMinQueue)
	}
	metrics.SetDispatchMetrics(qs.qCfg.Name, qs.currentR.ToFloat(), minQueue.nextDispatchR.ToFloat(), sMin.ToFloat(), sMax.ToFloat(), dsMin.ToFloat(), dsMax.ToFloat())
	return minQueue, oldestReqFromMinQueue
}

等待请求结束

func (req *request) Finish(execFn func()) bool {
	exec, idle := req.wait()
	if !exec {
		return idle
	}
	func() {
		defer func() {
			idle = req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
		}()

		execFn()
	}()

	return idle
}

func (req *request) wait() (bool, bool) {
	qs := req.qs

	// ========================================================================
	// 步骤 3:
	// The final step is to wait on a decision from
	// somewhere and then act on it.
	decisionAny := req.decision.Get()
	qs.lockAndSyncTime(req.ctx)
	defer qs.lock.Unlock()
	if req.waitStarted {
		// This can not happen, because the client is forbidden to
		// call Wait twice on the same request
		klog.Errorf("Duplicate call to the Wait method!  Immediately returning execute=false.  QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2)
		return false, qs.isIdleLocked()
	}
	req.waitStarted = true
	switch decisionAny {
	case decisionReject: // 拒绝
		klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
		qs.totRequestsRejected++
		qs.totRequestsTimedout++
		metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
		return false, qs.isIdleLocked()
	case decisionCancel: // 取消
	case decisionExecute: 
		klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
		return true, false
	default:
		// This can not happen, all possible values are handled above
		klog.Errorf("QS(%s): Impossible decision (type %T, value %#+v) for request %#+v %#+v!  Treating as cancel", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2)
	}
	// TODO(aaron-prindle) add metrics for this case
	klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
	// remove the request from the queue as it has timed out
	queue := req.queue
	if req.removeFromQueueLocked() != nil {
		defer qs.boundNextDispatchLocked(queue)
		qs.totRequestsWaiting--
		qs.totSeatsWaiting -= req.MaxSeats()
		qs.totRequestsRejected++
		qs.totRequestsCancelled++
		metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "cancelled")
		metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
		req.NoteQueued(false)
		qs.reqsGaugePair.RequestsWaiting.Add(-1)
		qs.seatDemandIntegrator.Set(float64(qs.totSeatsInUse + qs.totSeatsWaiting))
	}
	return false, qs.isIdleLocked()
}

指标

metrics解释备注
apiserver_flowcontrol_rejected_requests_totalapf 拒绝的 request 数目按照 pl 的名称以及 fs 的名称以及 rejection 原因进行排序,
拒绝掉的原因可能值有 queue-full【队列中已经有太多的请求在排队】、concurrency-limit【根据 plc 拒掉请求】、 time-out【请求还在队列中排队的时候就超时了】
apiserver_flowcontrol_dispatched_requests_total已经处理的请求总数-
apiserver_flowcontrol_current_inqueue_requests还在队列中有待处理的请求总数-
apiserver_flowcontrol_request_queue_length_after_enqueue实时队列中数据数目。这个值是抽样获取到的-
apiserver_flowcontrol_request_concurrency_limit每个 plc 的并行上限-
apiserver_flowcontrol_request_wait_duration_seconds请求处理过程中排队的时长,以及请求处理失败量-
apiserver_flowcontrol_request_execution_seconds请求执行花费时间-

参考