这篇写的就是创建、更新、调谐、同步时触发的函数变化。
一、调谐(Reconcile)
写过CRD的应该对这个函数很熟悉,当CRD有变化的时候或每过2s都是通过触发这个函数进行比对,调谐pod的状态
但是这个调谐和CRD的不一样,这里只是调谐READY和被驱逐或者失败的pod
kubectl apply -f demo.yaml
1.类似于kubelet删除pod中的流程2
case kubetypes.RECONCILE:klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjs(u.Pods))handler.HandlePodReconcile(u.Pods)
这里触发了reconcile函数
2.这个函数主要三个流程步骤
func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {start := kl.clock.Now()for _, pod := range pods {kl.podManager.UpdatePod(pod)if status.NeedToReconcilePodReadiness(pod) {mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)}****//流程8}
}
3.验证是否需要调谐
func NeedToReconcilePodReadiness(pod *v1.Pod) bool {if len(pod.Spec.ReadinessGates) == 0 {return false}podReadyCondition := GeneratePodReadyCondition(&pod.Spec, pod.Status.Conditions, pod.Status.ContainerStatuses, pod.Status.Phase)i, curCondition := podutil.GetPodConditionFromList(pod.Status.Conditions, v1.PodReady)if i >= 0 && (curCondition.Status != podReadyCondition.Status || curCondition.Message != podReadyCondition.Message) {return true}return false
}
4.验证一下实际状态
func GeneratePodReadyCondition(spec *v1.PodSpec, conditions []v1.PodCondition, containerStatuses []v1.ContainerStatus, podPhase v1.PodPhase) v1.PodCondition {containersReady := GenerateContainersReadyCondition(spec, containerStatuses, podPhase)*******
}func GenerateContainersReadyCondition(spec *v1.PodSpec, containerStatuses []v1.ContainerStatus, podPhase v1.PodPhase) v1.PodCondition {if containerStatuses == nil {return v1.PodCondition{Type: v1.ContainersReady,Status: v1.ConditionFalse,Reason: UnknownContainerStatuses,}}unknownContainers := []string{}unreadyContainers := []string{}for _, container := range spec.Containers {if containerStatus, ok := podutil.GetContainerStatus(containerStatuses, container.Name); ok {if !containerStatus.Ready {unreadyContainers = append(unreadyContainers, container.Name)}} else {unknownContainers = append(unknownContainers, container.Name)}}if podPhase == v1.PodSucceeded && len(unknownContainers) == 0 {return v1.PodCondition{Type: v1.ContainersReady,Status: v1.ConditionFalse,Reason: PodCompleted,}}unreadyMessages := []string{}if len(unknownContainers) > 0 {unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unknown status: %s", unknownContainers))}if len(unreadyContainers) > 0 {unreadyMessages = append(unreadyMessages, fmt.Sprintf("containers with unready status: %s", unreadyContainers))}unreadyMessage := strings.Join(unreadyMessages, ", ")if unreadyMessage != "" {return v1.PodCondition{Type: v1.ContainersReady,Status: v1.ConditionFalse,Reason: ContainersNotReady,Message: unreadyMessage,}}return v1.PodCondition{Type: v1.ContainersReady,Status: v1.ConditionTrue,}
}
5.重新回到上面的函数,获取了容器实际状态后,要做返回值判断了
func GeneratePodReadyCondition(spec *v1.PodSpec, conditions []v1.PodCondition, containerStatuses []v1.ContainerStatus, podPhase v1.PodPhase) v1.PodCondition {containersReady := GenerateContainersReadyCondition(spec, containerStatuses, podPhase)if containersReady.Status != v1.ConditionTrue {return v1.PodCondition{Type: v1.PodReady,Status: containersReady.Status,Reason: containersReady.Reason,Message: containersReady.Message,}}unreadyMessages := []string{}for _, rg := range spec.ReadinessGates {_, c := podutil.GetPodConditionFromList(conditions, rg.ConditionType)if c == nil {unreadyMessages = append(unreadyMessages, fmt.Sprintf("corresponding condition of pod readiness gate %q does not exist.", string(rg.ConditionType)))} else if c.Status != v1.ConditionTrue {unreadyMessages = append(unreadyMessages, fmt.Sprintf("the status of pod readiness gate %q is not \"True\", but %v", string(rg.ConditionType), c.Status))}}if len(unreadyMessages) != 0 {unreadyMessage := strings.Join(unreadyMessages, ", ")return v1.PodCondition{Type: v1.PodReady,Status: v1.ConditionFalse,Reason: ReadinessGatesNotReady,Message: unreadyMessage,}}return v1.PodCondition{Type: v1.PodReady,Status: v1.ConditionTrue,}
}
6.这里比较简单,找到conditions中为ready类型的返回。然后返回流程3的最后一步
func GetPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {if conditions == nil {return -1, nil}for i := range conditions {if conditions[i].Type == conditionType {return i, &conditions[i]}}return -1, nil
}
7.这是流程3的代码,如果需要调谐,则进入到dispatchWork函数,然后触发了podworkers.updatepod。之后的就可以参考kubelet源码 删除pod(二)
if status.NeedToReconcilePodReadiness(pod) {mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)}func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {// Run the sync in an async worker.zkl.podWorkers.UpdatePod(UpdatePodOptions{Pod: pod,MirrorPod: mirrorPod,UpdateType: syncType,StartTime: start,})*****
}
8.接下来要看pod是不是被驱逐和失败的状态,从缓存中取到pod信息,进行删除
if eviction.PodIsEvicted(pod.Status) {if podStatus, err := kl.podCache.Get(pod.UID); err == nil {kl.containerDeletor.deleteContainersInPod("", podStatus, true)}}
1触发HandlePodAdditions函数进行创建
case kubetypes.ADD:klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjs(u.Pods))handler.HandlePodAdditions(u.Pods)func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {start := kl.clock.Now()sort.Sort(sliceutils.PodsByCreationTime(pods))for _, pod := range pods {existingPods := kl.podManager.GetPods() kl.podManager.AddPod(pod)if kubetypes.IsMirrorPod(pod) {kl.handleMirrorPod(pod, start)continue}if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {activePods := kl.filterOutTerminatedPods(existingPods)if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {kl.rejectPod(pod, reason, message)continue}}mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)kl.probeManager.AddPod(pod)}
}
2.把所有pod进行校验,验证一下新创建的pod是否允许创建。
func (kl *Kubelet) filterOutInactivePods(pods []*v1.Pod) []*v1.Pod {filteredPods := make([]*v1.Pod, 0, len(pods))for _, p := range pods {if kl.podWorkers.IsPodKnownTerminated(p.UID) {continue}if kl.isAdmittedPodTerminal(p) && !kl.podWorkers.IsPodTerminationRequested(p.UID) {continue}filteredPods = append(filteredPods, p)}return filteredPods
}
3.校验是否能在node上创建
第四行为函数校验,这里情况比较多,不全部贴代码,大概是验证一下pod的可创建型,有以下几种函数
func (kl *Kubelet) canAdmitPod(pods []*v1.Pod, pod *v1.Pod) (bool, string, string) {attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}for _, podAdmitHandler := range kl.admitHandlers {if result := podAdmitHandler.Admit(attrs); !result.Admit {return false, result.Reason, result.Message}}return true, "", ""
}
上面的验证都通过,就会创建pod了
1.同上原理
case kubetypes.UPDATE:klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjs(u.Pods))handler.HandlePodUpdates(u.Pods)
2.因为触发的是updates函数,所以和删除是一个道理,参考kubelet源码 删除pod(二)