Kruise

Aug 18, 2025 · 7 min read

基本知识

v1.27.1新特性 InPlacePodVerticalScaling(就地垂直伸缩)

InPlacePodVerticalScaling(就地垂直伸缩)是 Kubernetes 中v1.27.1的一个特性,它允许在不重启 Pod 的情况下动态调整 Pod 中容器的资源限制(Resource Limits)

传统上,在 Kubernetes 中更新 Pod 的资源限制需要重新创建 Pod,这会导致应用程序中断和服务不可用的情况。但是,通过使用 InPlacePodVerticalScaling,可以实现对 Pod 进行资源限制的动态更新,而无需重新创建 Pod。

原地升级

需求

当我们升级 Pod 中一些 sidecar 容器(如采集日志、监控等)时,其实并不希望干扰到业务容器的运行。 但面对这种场景,Deployment 或 StatefulSet 的升级都会将整个 Pod 重建,势必会对业务造成一定的影响。而容器级别的原地升级变动的范围非常可控,只会将需要升级的容器做重建,其余容器包括网络、挂载盘都不会受到影响。

优点

  • 节省了调度的耗时,Pod 的位置、资源都不发生变化; 节省了分配网络的耗时,Pod 还使用原有的 IP; 节省了分配、挂载远程盘的耗时,Pod 还使用原有的 PV(且都是已经在 Node 上挂载好的);
  • 节省了大部分拉取镜像的耗时,因为 Node 上已经存在了应用的旧镜像,当拉取新版本镜像时只需要下载很少的几层 layer。
  • pod 的其他容器可以不受影响

原理

每个 Node 上的 Kubelet,会针对本机上所有 Pod.spec.containers 中的每个 container 计算一个 hash 值,并记录到实际创建的容器中。

如果我们修改了 Pod 中某个 container 的 image 字段,kubelet 会发现 container 的 hash 发生了变化、与机器上过去创建的容器 hash 不一致,而后 kubelet 就会把旧容器停掉,然后根据最新 Pod spec 中的 container 来创建新的容器。

func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
	klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod))

	createPodSandbox, attempt, sandboxID := runtimeutil.PodSandboxChanged(pod, podStatus)
	changes := podActions{
		KillPod:           createPodSandbox,
		CreateSandbox:     createPodSandbox,
		SandboxID:         sandboxID,
		Attempt:           attempt,
		ContainersToStart: []int{},
		ContainersToKill:  make(map[kubecontainer.ContainerID]containerToKillInfo),
	}

    // ..
	// check the status of containers.
	for idx, container := range pod.Spec.Containers {
		containerStatus := podStatus.FindContainerStatusByName(container.Name)

        // ...
		
		// The container is running, but kill the container if any of the following condition is met.
		var message string
		var reason containerKillReason
		restart := shouldRestartOnFailure(pod)
		// Do not restart if only the Resources field has changed with InPlacePodVerticalScaling enabled
		if _, _, changed := containerChanged(&container, containerStatus); changed &&
			(!isInPlacePodVerticalScalingAllowed(pod) ||
				kubecontainer.HashContainerWithoutResources(&container) != containerStatus.HashWithoutResources) {
			// 没有开启原地升级,且除了资源后的 hash 更改
			message = fmt.Sprintf("Container %s definition changed", container.Name)
			// Restart regardless of the restart policy because the container
			// spec changed.
			restart = true
		} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
			// If the container failed the liveness probe, we should kill it.
			message = fmt.Sprintf("Container %s failed liveness probe", container.Name)
			reason = reasonLivenessProbe
		} else if startup, found := m.startupManager.Get(containerStatus.ID); found && startup == proberesults.Failure {
			// If the container failed the startup probe, we should kill it.
			message = fmt.Sprintf("Container %s failed startup probe", container.Name)
			reason = reasonStartupProbe
		} else if isInPlacePodVerticalScalingAllowed(pod) && !m.computePodResizeAction(pod, idx, containerStatus, &changes) {
			// computePodResizeAction updates 'changes' if resize policy requires restarting this container
			continue
		} else {
			// Keep the container.
			keepCount++
			continue
		}

		// We need to kill the container, but if we also want to restart the
		// container afterwards, make the intent clear in the message. Also do
		// not kill the entire pod since we expect container to be running eventually.
		if restart {
			message = fmt.Sprintf("%s, will be restarted", message)
			changes.ContainersToStart = append(changes.ContainersToStart, idx)
		}

		changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
			name:      containerStatus.Name,
			container: &pod.Spec.Containers[idx],
			message:   message,
			reason:    reason,
		}
		klog.V(2).InfoS("Message for Container of pod", "containerName", container.Name, "containerStatusID", containerStatus.ID, "pod", klog.KObj(pod), "containerMessage", message)
	}

	if keepCount == 0 && len(changes.ContainersToStart) == 0 {
		changes.KillPod = true
	}

	return changes
}
// https://github.com/kubernetes/kubernetes/blob/9ddf1a02bd436e8ce16fb71ab832f4e0eca57a3a/pkg/kubelet/kuberuntime/kuberuntime_manager.go

// container 判断
func containerChanged(container *v1.Container, containerStatus *kubecontainer.Status) (uint64, uint64, bool) {
	// 计算 hash 
	expectedHash := kubecontainer.HashContainer(container)
	return expectedHash, containerStatus.Hash, containerStatus.Hash != expectedHash
}
func HashContainer(container *v1.Container) uint64 {
	hash := fnv.New32a()
	// Omit nil or empty field when calculating hash value
	// Please see https://github.com/kubernetes/kubernetes/issues/53644
	containerJSON, _ := json.Marshal(container)
	hashutil.DeepHashObject(hash, containerJSON)
	return uint64(hash.Sum32())
}
// 不带资源计算container的hash
func HashContainerWithoutResources(container *v1.Container) uint64 {
	// InPlacePodVerticalScaling enables mutable Resources field.
	// Changes to this field may not require container restart depending on policy.
	// Compute hash over fields besides the Resources field
	// NOTE: This is needed during alpha and beta so that containers using Resources but
	//       not subject to In-place resize are not unexpectedly restarted when
	//       InPlacePodVerticalScaling feature-gate is toggled.
	//TODO(vinaykul,InPlacePodVerticalScaling): Remove this in GA+1 and make HashContainerWithoutResources to become Hash.
	hashWithoutResources := fnv.New32a()
	containerCopy := container.DeepCopy()
	containerCopy.Resources = v1.ResourceRequirements{}
	containerJSON, _ := json.Marshal(containerCopy)
	hashutil.DeepHashObject(hashWithoutResources, containerJSON)
	return uint64(hashWithoutResources.Sum32())
}

kruise 原地升级原理

ContainerRecreateRequest

https://openkruise.io/docs/user-manuals/containerrecreaterequest

func (c *Controller) manage(crr *appsv1alpha1.ContainerRecreateRequest) error {
	runtimeManager, err := c.newRuntimeManager(c.runtimeFactory, crr)
	if err != nil {
		klog.ErrorS(err, "Failed to find runtime service", "namespace", crr.Namespace, "name", crr.Name)
		return c.completeCRRStatus(crr, fmt.Sprintf("failed to find runtime service: %v", err))
	}

	pod := convertCRRToPod(crr)

	// 获取 pod 状态
	podStatus, err := runtimeManager.GetPodStatus(context.TODO(), pod.UID, pod.Name, pod.Namespace)
	if err != nil {
		return fmt.Errorf("failed to GetPodStatus %s/%s with uid %s: %v", pod.Namespace, pod.Name, pod.UID, err)
	}
	klog.V(5).InfoS("CRR for Pod GetPodStatus", "namespace", crr.Namespace, "name", crr.Name, "podName", pod.Name, "podStatus", util.DumpJSON(podStatus))

	newCRRContainerRecreateStates := getCurrentCRRContainersRecreateStates(crr, podStatus)
	if !reflect.DeepEqual(crr.Status.ContainerRecreateStates, newCRRContainerRecreateStates) {
		return c.patchCRRContainerRecreateStates(crr, newCRRContainerRecreateStates)
	}

	var completedCount int
	for i := range newCRRContainerRecreateStates {
		state := &newCRRContainerRecreateStates[i]
		switch state.Phase {
		case appsv1alpha1.ContainerRecreateRequestSucceeded:
			completedCount++
			continue
		case appsv1alpha1.ContainerRecreateRequestFailed:
			completedCount++
			if crr.Spec.Strategy.FailurePolicy == appsv1alpha1.ContainerRecreateRequestFailurePolicyIgnore {
				continue
			}
			return c.completeCRRStatus(crr, "")
		case appsv1alpha1.ContainerRecreateRequestPending, appsv1alpha1.ContainerRecreateRequestRecreating:
		}

		if state.Phase == appsv1alpha1.ContainerRecreateRequestRecreating {
			state.IsKilled = true
			if crr.Spec.Strategy.OrderedRecreate {
				break
			}
			continue
		}

		kubeContainerStatus := podStatus.FindContainerStatusByName(state.Name)
		if kubeContainerStatus == nil {
			break
		}
        
		// 从pod状态中获取容器id,调用cri停止对应容器
		msg := fmt.Sprintf("Stopping container %s by ContainerRecreateRequest %s", state.Name, crr.Name)
		err := runtimeManager.KillContainer(pod, kubeContainerStatus.ID, state.Name, msg, nil)
		if err != nil {
			klog.ErrorS(err, "Failed to kill container in Pod for CRR", "containerName", state.Name, "podNamespace", pod.Namespace, "podName", pod.Name, "crrNamespace", crr.Namespace, "crrName", crr.Name)
			state.Phase = appsv1alpha1.ContainerRecreateRequestFailed
			state.Message = fmt.Sprintf("kill container error: %v", err)
			if crr.Spec.Strategy.FailurePolicy == appsv1alpha1.ContainerRecreateRequestFailurePolicyIgnore {
				continue
			}
			return c.patchCRRContainerRecreateStates(crr, newCRRContainerRecreateStates)
		}
		state.IsKilled = true
		state.Phase = appsv1alpha1.ContainerRecreateRequestRecreating
		break
	}
	
    //  更新CCR状态
	if !reflect.DeepEqual(crr.Status.ContainerRecreateStates, newCRRContainerRecreateStates) {
		return c.patchCRRContainerRecreateStates(crr, newCRRContainerRecreateStates)
	}

	// check if all containers have completed
	if completedCount == len(newCRRContainerRecreateStates) {
		return c.completeCRRStatus(crr, "")
	}

	if crr.Spec.Strategy != nil && crr.Spec.Strategy.MinStartedSeconds > 0 {
		c.queue.AddAfter(objectKey(crr), time.Duration(crr.Spec.Strategy.MinStartedSeconds)*time.Second)
	}
	return nil
}

SidecarSet

SidecarSet 是 OpenKruise 的一个 CRD,它支持通过 admission webhook 来自动为集群中创建的符合条件的 Pod 注入 sidecar 容器。SidecarSet 将 sidecar 容器的定义和生命周期与业务容器解耦。 它主要用于管理无状态的 sidecar 容器,比如监控、日志等 agent。

// https://github.com/openkruise/kruise/blob/6968bd8972ea176a584b676f4cd25379169e9389/pkg/util/inplaceupdate/inplace_update.go

// 原地升级接口
type Interface interface {
	CanUpdateInPlace(oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) bool
	Update(pod *v1.Pod, oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) UpdateResult
	Refresh(pod *v1.Pod, opts *UpdateOptions) RefreshResult
}

具体实现

func (c *realControl) CanUpdateInPlace(oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) bool {
	// 默认选项
	opts = SetOptionsDefaults(opts)
	// 如果不为nil 则可原地升级
	return opts.CalculateSpec(oldRevision, newRevision, opts) != nil
}


func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions {
	if opts == nil {
		opts = &UpdateOptions{}
	}

	if opts.CalculateSpec == nil {   // 计算更新的字段, 也用于判断是否可以原地升级
		opts.CalculateSpec = defaultCalculateInPlaceUpdateSpec
	}

	if opts.PatchSpecToPod == nil {  // 更新字段
		opts.PatchSpecToPod = defaultPatchUpdateSpecToPod
	}

	if opts.CheckPodUpdateCompleted == nil {  // 检查更新状态
		opts.CheckPodUpdateCompleted = DefaultCheckInPlaceUpdateCompleted
	}

	if opts.CheckContainersUpdateCompleted == nil {   // 检查容器更新状态
		opts.CheckContainersUpdateCompleted = defaultCheckContainersInPlaceUpdateCompleted
	}

	if opts.CheckPodNeedsBeUnready == nil {
		opts.CheckPodNeedsBeUnready = defaultCheckPodNeedsBeUnready
	}

	return opts
}
// 默认CalculateSpec函数, 这里体现出只支持label、annotation、镜像的更新的原地升级
func defaultCalculateInPlaceUpdateSpec(oldRevision, newRevision *apps.ControllerRevision, opts *UpdateOptions) *UpdateSpec {
	if oldRevision == nil || newRevision == nil {
		return nil
	}
	opts = SetOptionsDefaults(opts)

	patches, err := jsonpatch.CreatePatch(oldRevision.Data.Raw, newRevision.Data.Raw)
	if err != nil {
		return nil
	}

	// RecreatePodWhenChangeVCTInCloneSetGate enabled
	if utilfeature.DefaultFeatureGate.Enabled(features.RecreatePodWhenChangeVCTInCloneSetGate) {
		if !opts.IgnoreVolumeClaimTemplatesHashDiff {
			canInPlace := volumeclaimtemplate.CanVCTemplateInplaceUpdate(oldRevision, newRevision)
			if !canInPlace {
				return nil
			}
		}
	}

	oldTemp, err := GetTemplateFromRevision(oldRevision)
	if err != nil {
		return nil
	}
	newTemp, err := GetTemplateFromRevision(newRevision)
	if err != nil {
		return nil
	}

	updateSpec := &UpdateSpec{
		Revision:             newRevision.Name,
		ContainerImages:      make(map[string]string),
		ContainerResources:   make(map[string]v1.ResourceRequirements),
		ContainerRefMetadata: make(map[string]metav1.ObjectMeta),
		GraceSeconds:         opts.GracePeriodSeconds,
	}
	if opts.GetRevision != nil {
		updateSpec.Revision = opts.GetRevision(newRevision)
	}

	// all patches for podSpec can just update images in pod spec
	var metadataPatches []jsonpatch.Operation
	for _, op := range patches {
		op.Path = strings.Replace(op.Path, "/spec/template", "", 1)

		if !strings.HasPrefix(op.Path, "/spec/") {
			if strings.HasPrefix(op.Path, "/metadata/") {
				metadataPatches = append(metadataPatches, op)
				continue
			}
			return nil
		}

		if op.Operation != "replace" {
			return nil
		}
		if containerImagePatchRexp.MatchString(op.Path) {
			// for example: /spec/containers/0/image
			words := strings.Split(op.Path, "/")
			idx, _ := strconv.Atoi(words[3])
			if len(oldTemp.Spec.Containers) <= idx {
				return nil
			}
			updateSpec.ContainerImages[oldTemp.Spec.Containers[idx].Name] = op.Value.(string)
			continue
		}

		if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceWorkloadVerticalScaling) &&
			containerResourcesPatchRexp.MatchString(op.Path) {
			err = verticalUpdateImpl.UpdateInplaceUpdateMetadata(&op, oldTemp, updateSpec)
			if err != nil {
				klog.InfoS("UpdateInplaceUpdateMetadata error", "err", err)
				return nil
			}
			continue
		}
		return nil
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceWorkloadVerticalScaling) &&
		len(updateSpec.ContainerResources) != 0 {
		// when container resources changes exist, we should check pod qos
		if changed := verticalUpdateImpl.IsPodQoSChanged(oldTemp, newTemp); changed {
			klog.InfoS("can not inplace update when qos changed")
			return nil
		}
	}

	if len(metadataPatches) > 0 {
		if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceUpdateEnvFromMetadata) {
			// for example: /metadata/labels/my-label-key
			for _, op := range metadataPatches {
				if op.Operation != "replace" && op.Operation != "add" {
					continue
				}
				words := strings.SplitN(op.Path, "/", 4)
				if len(words) != 4 || (words[2] != "labels" && words[2] != "annotations") {
					continue
				}
				key := rfc6901Decoder.Replace(words[3])

				for i := range newTemp.Spec.Containers {
					c := &newTemp.Spec.Containers[i]
					objMeta := updateSpec.ContainerRefMetadata[c.Name]
					switch words[2] {
					case "labels":
						if !utilcontainermeta.IsContainerReferenceToMeta(c, "metadata.labels", key) {
							continue
						}
						if objMeta.Labels == nil {
							objMeta.Labels = make(map[string]string)
						}
						objMeta.Labels[key] = op.Value.(string)
						delete(oldTemp.ObjectMeta.Labels, key)
						delete(newTemp.ObjectMeta.Labels, key)

					case "annotations":
						if !utilcontainermeta.IsContainerReferenceToMeta(c, "metadata.annotations", key) {
							continue
						}
						if objMeta.Annotations == nil {
							objMeta.Annotations = make(map[string]string)
						}
						objMeta.Annotations[key] = op.Value.(string)
						delete(oldTemp.ObjectMeta.Annotations, key)
						delete(newTemp.ObjectMeta.Annotations, key)
					}

					updateSpec.ContainerRefMetadata[c.Name] = objMeta
					updateSpec.UpdateEnvFromMetadata = true
				}
			}
		}

		oldBytes, _ := json.Marshal(v1.Pod{ObjectMeta: oldTemp.ObjectMeta})
		newBytes, _ := json.Marshal(v1.Pod{ObjectMeta: newTemp.ObjectMeta})
		patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldBytes, newBytes, &v1.Pod{})
		if err != nil {
			return nil
		}
		updateSpec.MetaDataPatch = patchBytes
	}

	return updateSpec
}

参考

Danny
Authors
Devops
Life is short 人生苦短,及时行乐.