Trimaran

Mar 2, 2025 · 8 min read

为了应对集群节点高负载、负载不均衡等问题,需要动态平衡各个节点之间的资源使用率,因此需要基于节点的相关监控指标,构建集群资源视图,从而为下述两种治理方向奠定实现基础:

  • 方向一: 在 Pod 调度阶段,加入优先将 Pod 调度到资源实际使用率低的节点的节点Score插件
  • 方向二: 在集群治理阶段,通过实时监控,在观测到节点资源率较高、节点故障、Pod 数量较多等情况时,可以自动干预,迁移节点上的一些 Pod 到利用率低的节点上

解决方式

  • 针对方向一,可以通过赋予Kubernetes调度器感知集群实际负载的能力,计算资源分配和实际资源利用之间的差距,优化调度策略。
  • 针对方向二,社区给出了 Descheduler 重调度方案,Descheduler 可以根据一些规则和策略配置来帮助再平衡集群状态,当前项目实现了十余种策略

scheduler-plugins基于requests调度,非常依赖request的设置。

基本概念

重调度(Descheduling): 通常是指将部署在某个节点上调度不合理的Pod重新调度到另一个节点. 在集群利用率不均而产生热点节点、节点属性变化导致存量Pod调度规则不匹配等场景下,您可以使用重调度来优化资源使用,确保Pod在最佳节点上运行,从而保障集群的高可用性和工作负载的高效运行。

Gang scheduling(帮派调度) : 是一种调度算法,主要的原则是保证所有相关联的进程能够同时启动,防止部分进程的异常,导致整个关联进程组的阻塞. 例如,您提交一个批量 Job,这个批量 Job 包含多个任务,要么这多个任务全部调度成功,要么一个都调度不成功。 这种 All-or-Nothing 调度场景,就被称作 Gang scheduling.

拓扑感知调度: 在机器学习和大数据分析类作业中,Pod间通常有较大的网络通信需求。默认情况下,原生Kubernetes调度器会将Pod均匀打散在集群中,增加了通信距离,导致作业完成时间变长。您可以将Pod部署在同一可用区或机架上,减少通信跳数和时延以优化作业执行时间。

指标获取

通过 load-watcher,可以是service部署,或则直接作为客户端库嵌入.

// https://github.com/kubernetes-sigs/scheduler-plugins/blob/59a8b1ca68d0256d10239a588d69ab0ba28d4076/pkg/trimaran/collector.go
func NewCollector(logger klog.Logger, trimaranSpec *pluginConfig.TrimaranSpec) (*Collector, error) {
	if err := checkSpecs(trimaranSpec); err != nil {
		return nil, err
	}
	logger.V(4).Info("Using TrimaranSpec", "type", trimaranSpec.MetricProvider.Type,
		"address", trimaranSpec.MetricProvider.Address, "watcher", trimaranSpec.WatcherAddress)

	var client loadwatcherapi.Client
	if trimaranSpec.WatcherAddress != "" {
		// 作为service
		client, _ = loadwatcherapi.NewServiceClient(trimaranSpec.WatcherAddress)
	} else {
		
		// 作为库
		opts := watcher.MetricsProviderOpts{
			Name:               string(trimaranSpec.MetricProvider.Type),
			Address:            trimaranSpec.MetricProvider.Address,
			AuthToken:          trimaranSpec.MetricProvider.Token,
			InsecureSkipVerify: trimaranSpec.MetricProvider.InsecureSkipVerify,
		}
		client, _ = loadwatcherapi.NewLibraryClient(opts)
	}

	collector := &Collector{
		client: client,
	}

	// 更新本地缓存指标
	// populate metrics before returning
	err := collector.updateMetrics(logger)
	if err != nil {
		logger.Error(err, "Unable to populate metrics initially")
	}
	// start periodic updates
	go func() {
		metricsUpdaterTicker := time.NewTicker(time.Second * metricsUpdateIntervalSeconds)
		for range metricsUpdaterTicker.C {
			err = collector.updateMetrics(logger)
			if err != nil {
				logger.Error(err, "Unable to update metrics")
			}
		}
	}()
	return collector, nil
}

作为库的方式

func NewLibraryClient(opts watcher.MetricsProviderOpts) (Client, error) {
	var err error
	client := libraryClient{}
	switch opts.Name {
	case watcher.PromClientName: // prometheus 客户端的方式
		client.fetcherClient, err = metricsprovider.NewPromClient(opts)
	case watcher.SignalFxClientName:
		client.fetcherClient, err = metricsprovider.NewSignalFxClient(opts)
	default:
		client.fetcherClient, err = metricsprovider.NewMetricsServerClient()
	}
	if err != nil {
		return client, err
	}
	client.watcher = watcher.NewWatcher(client.fetcherClient)
	// 开始抓取指标
	client.watcher.StartWatching()
	return client, nil
}

LoadVariationRiskBalancing 负载感知均衡调度

LoadVariationRiskBalancing 插件的算法是利用节点负载在某段时间内(滑动窗口)的平均值(M)和标准差(V)这两个指标,假设集群所有节点的CPU利用率的M+V是0.3(30%),那么每个节点的cpu利用率的M+V越接近0.3,得分应该越小。

LoadVariationRiskBalancing是分别计算每种资源的得分,再取得分的最小值,例:假设CPU得分0,内存得分10,则节点的最终得分是0。

算法步骤:

  1. 获取待调度的Pod 的request的资源,设为r 。
// pkg/trimaran/resourcestats.go

// GetResourceRequested : calculate the resource requests of a pod (CPU and Memory)
func GetResourceRequested(pod *v1.Pod) *framework.Resource {
	return GetEffectiveResource(pod, func(container *v1.Container) v1.ResourceList {
		return container.Resources.Requests
	})
}
  1. 获取当前节点所有类型的资源(CPU、Memory等)的利用率的百分比(0到1),并根据计算的滑动窗口的平均数(V)和标准差(M),进行打分。
func CreateResourceStats(logger klog.Logger, metrics []watcher.Metric, node *v1.Node, podRequest *framework.Resource,
	resourceName v1.ResourceName, watcherType string) (rs *ResourceStats, isValid bool) {
	// get resource usage statistics
	nodeUtil, nodeStd, metricFound := GetResourceData(metrics, watcherType)
	if !metricFound {
		logger.V(6).Info("Resource usage statistics for node : no valid data", "node", klog.KObj(node))
		return nil, false
	}
	// get resource capacity
	rs = &ResourceStats{}
	allocatableResources := node.Status.Allocatable
	am := allocatableResources[resourceName]

	if resourceName == v1.ResourceCPU {
		rs.Capacity = float64(am.MilliValue())
		rs.Req = float64(podRequest.MilliCPU)
	} else {
		rs.Capacity = float64(am.Value())
		rs.Capacity *= MegaFactor
		rs.Req = float64(podRequest.Memory) * MegaFactor
	}

	// calculate absolute usage statistics
	rs.UsedAvg = nodeUtil * rs.Capacity / 100
	rs.UsedStdev = nodeStd * rs.Capacity / 100

	logger.V(6).Info("Resource usage statistics for node", "node", klog.KObj(node), "resource", resourceName,
		"capacity", rs.Capacity, "required", rs.Req, "usedAvg", rs.UsedAvg, "usedStdev", rs.UsedStdev)
	return rs, true
}
  1. 计算当前节点对各类资源的得分:Si = M + r + V
// computeScore : compute score given usage statistics
// - risk = [ average + margin * stDev^{1/sensitivity} ] / 2
// - score = ( 1 - risk ) * maxScore
func computeScore(logger klog.Logger, rs *trimaran.ResourceStats, margin float64, sensitivity float64) float64 {
	if rs.Capacity <= 0 {
		logger.Error(nil, "Invalid resource capacity", "capacity", rs.Capacity)
		return 0
	}

	// make sure values are within bounds
	rs.Req = math.Max(rs.Req, 0)
	rs.UsedAvg = math.Max(math.Min(rs.UsedAvg, rs.Capacity), 0)
	rs.UsedStdev = math.Max(math.Min(rs.UsedStdev, rs.Capacity), 0)

	// calculate average and deviation factors
	// mu-sigma图的mu指的是均值(μ),sigma指标准差(σ)。
	mu, sigma := trimaran.GetMuSigma(rs)

	// apply root power
	if sensitivity >= 0 {
		sigma = math.Pow(sigma, 1/sensitivity)
	}
	// apply multiplier
	sigma *= margin
	sigma = math.Max(math.Min(sigma, 1), 0)

	// evaluate overall risk factor
	risk := (mu + sigma) / 2
	logger.V(6).Info("Evaluating risk factor", "mu", mu, "sigma", sigma, "margin", margin, "sensitivity", sensitivity, "risk", risk)
	return (1. - risk) * float64(framework.MaxNodeScore)
}
  1. 获取每种类型资源的分数并将其绑定到 [0,1],意思就是最小值为0,最大值为1,小于最小值取最小值,大于最大值取最大值:Si = min(Si,1.0)
  2. 计算当前节点每种资源的优先级得分:Ui = (1-Si) x MaxPriority。
  3. 当前节点最终的得分为:U = min(Ui),意思是cpu、内存的分数,哪个低取哪个:

TargetLoadPacking 负载上限调度

TargetLoadPacking即目标负载调度器,用于控制节点的CPU利用率不超过目标值x%(例如65%),通过打分让所有cpu利用率超过x%的都不被选中。目标负载调度器只支持CPU。

使用此插件结合LoadVariationRiskBalancing插件,可以保证在负载均衡调度的基础上,保证节点不会超负载,确保服务的稳定运行。成本的优化一定是建立在稳定性之上的。

func (pl *TargetLoadPacking) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
	logger := klog.FromContext(ctx)
	score := framework.MinNodeScore
	nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
	if err != nil {
		return score, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err))
	}

	// 获取节点 metrics
	metrics, allMetrics := pl.collector.GetNodeMetrics(logger, nodeName)
    // ...

	// 计算 pod 使用率
	var curPodCPUUsage int64
	for _, container := range pod.Spec.Containers {
		// Pod cpu使用量是优先通过limit取得,不存在才通过request获取,在公司内limit和Pod实际使用率偏差较大
		curPodCPUUsage += PredictUtilisation(&container)
	}
	logger.V(6).Info("Predicted utilization for pod", "podName", pod.Name, "cpuUsage", curPodCPUUsage)
	// 补充runtimeClass 定义的 overhead
	if pod.Spec.Overhead != nil {
		curPodCPUUsage += pod.Spec.Overhead.Cpu().MilliValue()
	}
	// 计算当前节点的 cpu 使用率
	var nodeCPUUtilPercent float64
	var cpuMetricFound bool
	for _, metric := range metrics {
		if metric.Type == watcher.CPU {
			if metric.Operator == watcher.Average || metric.Operator == watcher.Latest {
				nodeCPUUtilPercent = metric.Value
				cpuMetricFound = true
			}
		}
	}

	if !cpuMetricFound {
		logger.Error(nil, "Cpu metric not found in node metrics", "nodeName", nodeName, "nodeMetrics", metrics)
		return score, nil
	}
	nodeCPUCapMillis := float64(nodeInfo.Node().Status.Capacity.Cpu().MilliValue())
	nodeCPUUtilMillis := (nodeCPUUtilPercent / 100) * nodeCPUCapMillis

	logger.V(6).Info("Calculating CPU utilization and capacity", "nodeName", nodeName, "cpuUtilMillis", nodeCPUUtilMillis, "cpuCapMillis", nodeCPUCapMillis)

	var missingCPUUtilMillis int64 = 0
	pl.eventHandler.RLock()
	for _, info := range pl.eventHandler.ScheduledPodsCache[nodeName] {
		// If the time stamp of the scheduled pod is outside fetched metrics window, or it is within metrics reporting interval seconds, we predict util.
		// Note that the second condition doesn't guarantee metrics for that pod are not reported yet as the 0 <= t <= 2*metricsAgentReportingIntervalSeconds
		// t = metricsAgentReportingIntervalSeconds is taken as average case and it doesn't hurt us much if we are
		// counting metrics twice in case actual t is less than metricsAgentReportingIntervalSeconds
		if info.Timestamp.Unix() > allMetrics.Window.End || info.Timestamp.Unix() <= allMetrics.Window.End &&
			(allMetrics.Window.End-info.Timestamp.Unix()) < metricsAgentReportingIntervalSeconds {
			for _, container := range info.Pod.Spec.Containers {
				missingCPUUtilMillis += PredictUtilisation(&container)
			}
			missingCPUUtilMillis += info.Pod.Spec.Overhead.Cpu().MilliValue()
			logger.V(6).Info("Missing utilization for pod", "podName", info.Pod.Name, "missingCPUUtilMillis", missingCPUUtilMillis)
		}
	}
	pl.eventHandler.RUnlock()
	logger.V(6).Info("Missing utilization for node", "nodeName", nodeName, "missingCPUUtilMillis", missingCPUUtilMillis)

	var predictedCPUUsage float64
	if nodeCPUCapMillis != 0 { 
		// 如果 Pod 调度在该节点下,计算预期利用率,即 target_cpu = node_cpu + pod_cpu
		predictedCPUUsage = 100 * (nodeCPUUtilMillis + float64(curPodCPUUsage) + float64(missingCPUUtilMillis)) / nodeCPUCapMillis
	}
	if predictedCPUUsage > float64(hostTargetUtilizationPercent) {
		if predictedCPUUsage > 100 {
			// 超过100%,直接返回 MinNodeScore 0
			return score, framework.NewStatus(framework.Success, "")
		}
		penalisedScore := int64(math.Round(float64(hostTargetUtilizationPercent) * (100 - predictedCPUUsage) / (100 - float64(hostTargetUtilizationPercent))))
		logger.V(6).Info("Penalised score for host", "nodeName", nodeName, "penalisedScore", penalisedScore)
		return penalisedScore, framework.NewStatus(framework.Success, "")
	}

	score = int64(math.Round((100-float64(hostTargetUtilizationPercent))*
		predictedCPUUsage/float64(hostTargetUtilizationPercent) + float64(hostTargetUtilizationPercent)))
	logger.V(6).Info("Score for host", "nodeName", nodeName, "score", score)
	return score, framework.NewStatus(framework.Success, "")
}
func PredictUtilisation(container *v1.Container) int64 {
	if _, ok := container.Resources.Limits[v1.ResourceCPU]; ok {
		return container.Resources.Limits.Cpu().MilliValue()
	} else if _, ok := container.Resources.Requests[v1.ResourceCPU]; ok {
		return int64(math.Round(float64(container.Resources.Requests.Cpu().MilliValue()) * requestsMultiplier))
	}
	return requestsMilliCores
}

计算公式

cluster_cpu = 预设理想值
target_cpu = node_cpu + pod_cpu
if target_cpu <= cluster_cpu:
  score = (100 - cluster_cpu)target_cpu/cluster_cpu+ cluster_cpu 
else if cluster_cpu < target_cpu <= 100:
  score = cluster_cpu(100 - target_cpu)/(100 - cluster_cpu)
else:
  score = 0

LowRiskOverCommitment 资源限制感知调度

让limits也能均衡分布,通过跨节点“分散”或“平衡”Pod 的资源limits来缓解可突发Pod导致的资源过度订阅问题。

// computeRank : rank function for the LowRiskOverCommitment
func (pl *LowRiskOverCommitment) computeRank(logger klog.Logger, metrics []watcher.Metric, nodeInfo *framework.NodeInfo, pod *v1.Pod,
	podRequests *framework.Resource, podLimits *framework.Resource) float64 {
	node := nodeInfo.Node()
	// calculate risk based on requests and limits
	nodeRequestsAndLimits := trimaran.GetNodeRequestsAndLimits(logger, nodeInfo.Pods, node, pod, podRequests, podLimits)
	riskCPU := pl.computeRisk(logger, metrics, v1.ResourceCPU, watcher.CPU, node, nodeRequestsAndLimits)
	riskMemory := pl.computeRisk(logger, metrics, v1.ResourceMemory, watcher.Memory, node, nodeRequestsAndLimits)
	rank := 1 - math.Max(riskCPU, riskMemory)

	logger.V(6).Info("Node rank", "nodeName", node.GetName(), "riskCPU", riskCPU, "riskMemory", riskMemory, "rank", rank)

	return rank
}


func (pl *LowRiskOverCommitment) computeRisk(logger klog.Logger, metrics []watcher.Metric, resourceName v1.ResourceName,
	resourceType string, node *v1.Node, nodeRequestsAndLimits *trimaran.NodeRequestsAndLimits) float64 {
	var riskLimit, riskLoad, totalRisk float64

	defer func() {
		logger.V(6).Info("Calculated risk", "node", klog.KObj(node), "resource", resourceName,
			"riskLimit", riskLimit, "riskLoad", riskLoad, "totalRisk", totalRisk)
	}()

	nodeRequest := nodeRequestsAndLimits.NodeRequest
	nodeLimit := nodeRequestsAndLimits.NodeLimit
	nodeRequestMinusPod := nodeRequestsAndLimits.NodeRequestMinusPod
	nodeLimitMinusPod := nodeRequestsAndLimits.NodeLimitMinusPod
	nodeCapacity := nodeRequestsAndLimits.Nodecapacity

	var request, limit, capacity, requestMinusPod, limitMinusPod int64
	if resourceName == v1.ResourceCPU {
		request = nodeRequest.MilliCPU
		limit = nodeLimit.MilliCPU
		requestMinusPod = nodeRequestMinusPod.MilliCPU
		limitMinusPod = nodeLimitMinusPod.MilliCPU
		capacity = nodeCapacity.MilliCPU
	} else if resourceName == v1.ResourceMemory {
		request = nodeRequest.Memory
		limit = nodeLimit.Memory
		requestMinusPod = nodeRequestMinusPod.Memory
		limitMinusPod = nodeLimitMinusPod.Memory
		capacity = nodeCapacity.Memory
	} else {
		// invalid resource
		logger.V(6).Info("Unexpected resource", "resourceName", resourceName)
		return 0
	}

	// (1) riskLimit : calculate overcommit potential load
	if limit > capacity {
		riskLimit = float64(limit-capacity) / float64(limit-request)
	}
	logger.V(6).Info("RiskLimit", "node", klog.KObj(node), "resource", resourceName, "riskLimit", riskLimit)

	// (2) riskLoad : calculate measured overcommitment
	zeroRequest := &framework.Resource{}
	stats, ok := trimaran.CreateResourceStats(logger, metrics, node, zeroRequest, resourceName, resourceType)
	if ok {
		// fit a beta distribution to the measured load stats
		mu, sigma := trimaran.GetMuSigma(stats)
		// adjust standard deviation due to data smoothing
		// 求出x 的y 次方
		sigma *= math.Pow(float64(pl.args.SmoothingWindowSize), 0.5)
		// limit the standard deviation close to the allowed maximum for the beta distribution
		// math.Sqrt 一个数的平方根
		sigma = math.Min(sigma, math.Sqrt(GetMaxVariance(mu)*MaxVarianceAllowance))

		// calculate area under beta probability curve beyond total allocated, as overuse risk measure
		allocThreshold := float64(requestMinusPod) / float64(capacity)
		allocThreshold = math.Min(math.Max(allocThreshold, 0), 1)
		allocProb, fitDistribution := ComputeProbability(mu, sigma, allocThreshold)
		if fitDistribution != nil {
			klog.V(6).InfoS("FitDistribution", "node", klog.KObj(node), "resource", resourceName, "dist", fitDistribution.Print())
		}
		// condition the probability in case total limit is less than capacity
		if limitMinusPod < capacity && requestMinusPod <= limitMinusPod {
			limitThreshold := float64(limitMinusPod) / float64(capacity)
			if limitThreshold == 0 {
				allocProb = 1 // zero over zero
			} else if fitDistribution != nil {
				limitProb := fitDistribution.DistributionFunction(limitThreshold)
				if limitProb > 0 {
					allocProb /= limitProb
					allocProb = math.Min(math.Max(allocProb, 0), 1)
				}
			}
		}

		// calculate risk
		riskLoad = 1 - allocProb
		logger.V(6).Info("RiskLoad", "node", klog.KObj(node), "resource", resourceName,
			"allocThreshold", allocThreshold, "allocProb", allocProb, "riskLoad", riskLoad)
	}

	// combine two components of risk into a total risk as a weighted sum
	w := pl.riskLimitWeightsMap[resourceName]
	totalRisk = w*riskLimit + (1-w)*riskLoad
	totalRisk = math.Min(math.Max(totalRisk, 0), 1)
	return totalRisk
}

参考