KubeEdge源码分析之Edged
首先是注册,注册是在EdgeCore的启动流程中包含的。
注册
// Register register edged
func Register(e *v1alpha1.Edged) {
edgedconfig.InitConfigure(e)
edged, err := newEdged(e.Enable)
if err != nil {
klog.Errorf("init new edged error, %v", err)
os.Exit(1)
}
core.Register(edged)
}
这里就是初始化了edged对象,其中的newEdged比较重要,下面列出里面的一些操作。
-
初始化backoff
-
初始化podManager
-
初始化ImageGCPolicy,镜像回收策略
-
初始化EventRecorder
-
初始化metaclient
-
初始化runtimeClassManager,这个manager的作用是缓存RuntimeClass API对象,并向kubelet提供访问
-
初始化containerGCPolicy,容器回收策略
-
开启docker服务(startDockerServer方法)
RemoteRuntimeEndpoint必须是unix:///var/run/dockershim.sock或者/var/run/dockershim.sock,默认值是前者。首先是初始化各种配置,然后调用了
dockershim的API初始化了一个CRIService对象ds, err := dockershim.NewDockerService(DockerClientConfig, edgedconfig.Config.PodSandboxImage, streamingConfig, &pluginConfigs, cgroupName, cgroupDriver, DockershimRootDir)然后初始化了DockerServer,这一步创建了dockershim的grpc服务
server := dockerremote.NewDockerServer(edgedconfig.Config.RemoteRuntimeEndpoint, ds) -
初始化kubedns
-
根据配置中的endpoint,初始化runtimeService和imageService
runtimeService, imageService, err := getRuntimeAndImageServices( edgedconfig.Config.RemoteRuntimeEndpoint, edgedconfig.Config.RemoteImageEndpoint, metav1.Duration{ Duration: time.Duration(edgedconfig.Config.RuntimeRequestTimeout) * time.Minute, }) -
初始化ContainerLifecycleManager
-
初始化cadvisor
-
初始化machineInfo
-
初始化logManager
-
初始化kubeGenericRuntimeManager
-
初始化ContainerManager
-
初始化RuntimeCache
-
初始化imageGCManager
-
初始化containerGCManager
-
初始化podKiller
注册完成后就是启动了,启动也包含在EdgeCore的启动流程中。
初始化volumePluginMgr
e.volumePluginMgr = NewInitializedVolumePluginMgr(e, ProbeVolumePlugins(""))
这个地方具体是先生成了一个插件列表,然后调用VolumePluginMgr#InitPlugins初始化列表中的volumn插件。
启动模块
if err := e.initializeModules(); err != nil {
klog.Errorf("initialize module error: %v", err)
os.Exit(1)
}
initializeModules方法的代码如下,分析请看注释
func (e *edged) initializeModules() error {
// 可观测性的逻辑
if edgedconfig.Config.EnableMetrics {
// Start resource analyzer
e.resourceAnalyzer.Start()
if err := e.cadvisor.Start(); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
// TODO(random-liu): Add backoff logic in the babysitter
klog.Exitf("Failed to start cAdvisor %v", err)
}
// trigger on-demand stats collection once so that we have capacity information for ephemeral storage.
// ignore any errors, since if stats collection is not successful, the container manager will fail to start below.
e.Provider.GetCgroupStats("/", true)
}
// 下面的代码的核心是e.containerManager.Start
// Start container manager.
node, err := e.initialNode()
if err != nil {
klog.Errorf("Failed to initialNode %v", err)
return err
}
// If the container logs directory does not exist, create it.
if _, err := os.Stat(ContainerLogsDir); err != nil {
if err := e.os.MkdirAll(ContainerLogsDir, 0755); err != nil {
klog.Errorf("Failed to create directory %q: %v", ContainerLogsDir, err)
}
}
// containerManager must start after cAdvisor because it needs filesystem capacity information
err = e.containerManager.Start(node, e.GetActivePods, edgedutil.NewSourcesReady(e.isInitPodReady), e.statusManager, e.runtimeService)
if err != nil {
klog.Errorf("Failed to start container manager, err: %v", err)
return err
}
return nil
}
初始化volumeManager
e.volumeManager = volumemanager.NewVolumeManager(
true,
types.NodeName(e.nodeName),
e.podManager,
e.statusManager,
e.kubeClient,
e.volumePluginMgr,
e.containerRuntime,
e.mounter,
e.hostUtil,
e.getPodsDir(),
record.NewEventRecorder(),
false,
false,
volumepathhandler.NewBlockVolumePathHandler(),
)
go e.volumeManager.Run(edgedutil.NewSourcesReady(e.isInitPodReady), utilwait.NeverStop)
这个地方调用了kubelet的API进行初始化
创建一个goroutine不断查询node状态
go utilwait.Until(e.syncNodeStatus, e.nodeStatusUpdateFrequency, utilwait.NeverStop)
创建一个goroutine根据channel杀掉对应的pod
go utilwait.Until(e.podKiller.PerformPodKillingWork, 5*time.Second, utilwait.NeverStop)
更新节点label
// update node label
node, _ := e.GetNode()
node.Labels = e.labels
if err := e.metaClient.Nodes(e.namespace).Update(node); err != nil {
klog.Errorf("update node failed, error: %v", err)
}
创建pod工作循环
e.podAddWorkerRun(e.concurrentConsumers)
e.podRemoveWorkerRun(e.concurrentConsumers)
这里会根据concurrentConsumers来生成相应个数的goroutine,然后一直循环,从podAdditionQueue和podDeletionQueue中获取新增或删除的pod的信息,然后执行pod的增加和删除操作。
创建pod同步循环
go e.syncLoopIteration(e.pleg.Watch(), housekeepingTicker.C, syncWorkQueueCh.C)
感觉这个地方的syncLoopIteration太重要了,虽然代码很长,还是贴一下吧,分析请看注释。
func (e *edged) syncLoopIteration(plegCh <-chan *pleg.PodLifecycleEvent, housekeepingCh <-chan time.Time, syncWorkQueueCh <-chan time.Time) {
for {
select {
case update := <-e.livenessManager.Updates():
// 如果发现有pod创建错误了,就将pod的信息重新放入podAdditionQueue
if update.Result == proberesults.Failure {
pod, ok := e.podManager.GetPodByUID(update.PodUID)
...
klog.Infof("Will restart pod [%s]", pod.Name)
key := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
e.podAdditionQueue.Add(key.String())
}
case plegEvent := <-plegCh:
if pod, ok := e.podManager.GetPodByUID(plegEvent.ID); ok {
// 更新pod状态
if err := e.updatePodStatus(pod); err != nil {
klog.Errorf("update pod %s status error", pod.Name)
break
}
// 如果容器die了
if plegEvent.Type == pleg.ContainerDied {
// 如果restart策略为Never,直接忽略
if pod.Spec.RestartPolicy == v1.RestartPolicyNever {
break
}
var containerCompleted bool
// 如果restart策略为OnFailure,检查是否为启动失败
if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode == 0 {
containerCompleted = true
break
}
}
if containerCompleted {
break
}
}
klog.Infof("sync loop get event container died, restart pod [%s]", pod.Name)
key := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
// 否则添加至podAdditionQueue
e.podAdditionQueue.Add(key.String())
} else {
klog.Infof("sync loop get event [%s], ignore it now.", plegEvent.Type)
}
} else {
klog.Infof("sync loop ignore event: [%s], with pod [%s] not found", plegEvent.Type, plegEvent.ID)
}
case <-housekeepingCh:
// 清理所有pod
err := e.HandlePodCleanups()
if err != nil {
klog.Errorf("Handle Pod Cleanup Failed: %v", err)
}
case <-syncWorkQueueCh:
// 重新同步pod(ready状态的pod和内部模块需要重新同步)
podsToSync := e.getPodsToSync()
if len(podsToSync) == 0 {
break
}
for _, pod := range podsToSync {
if !e.podIsTerminated(pod) {
key := types.NamespacedName{
Namespace: pod.Namespace,
Name: pod.Name,
}
e.podAdditionQueue.Add(key.String())
}
}
}
}
}
启动垃圾回收
e.imageGCManager.Start()
e.StartGarbageCollection()
其中,StartGarbageCollection主要是创建了两个goroutine,不断地调用CRI的api,每2分钟进行一次containerGC,每5分钟进行一次imageGC
初始化并启动pluginManager
e.pluginManager = pluginmanager.NewPluginManager(
e.getPluginsRegistrationDir(), /* sockDir */
nil,
)
// Adding Registration Callback function for CSI Driver
e.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csiplugin.PluginHandler))
// Start the plugin manager
klog.Infof("starting plugin manager")
go e.pluginManager.Run(edgedutil.NewSourcesReady(e.isInitPodReady), utilwait.NeverStop)
这里并不知道为啥要为CSI Driver设置回调函数,先Mark一下🌟
启动各种东西
// start the CPU manager in the clcm
err := e.clcm.StartCPUManager(e.GetActivePods, edgedutil.NewSourcesReady(e.isInitPodReady), e.statusManager, e.runtimeService)
if err != nil {
klog.Errorf("Failed to start container manager, err: %v", err)
return
}
e.logManager.Start()
e.runtimeClassManager.Start(utilwait.NeverStop)
klog.Infof("starting syncPod")
同步pod
在start的最后有这样一行代码:
e.syncPod()
下面来看一下这个syncPod具体做了什么。
-
睡眠10秒
time.Sleep(10 * time.Second) -
向metamanager发送信息,拿到正在退出的pods
//when starting, send msg to metamanager once to get existing pods info := model.NewMessage("").BuildRouter(e.Name(), e.Group(), e.namespace+"/"+model.ResourceTypePod, model.QueryOperation) beehiveContext.Send(metamanager.MetaManagerModuleName, *info)
接着是个大循环(分析请看注释)
for {
select {
case <-beehiveContext.Done():
// 收到全局的结束信号后,停止循环,结束edged进程
klog.Warning("Sync pod stop")
return
default:
}
// 从context中拿到消息
result, err := beehiveContext.Receive(e.Name())
if err != nil {
klog.Errorf("failed to get pod: %v", err)
continue
}
// 解析消息
_, resType, resID, err := util.ParseResourceEdge(result.GetResource(), result.GetOperation())
if err != nil {
klog.Errorf("failed to parse the Resource: %v", err)
continue
}
// 拿到消息中的操作
op := result.GetOperation()
// 拿到消息中的内容
content, err := result.GetContentData()
if err != nil {
klog.Errorf("get message content data failed: %v", err)
continue
}
klog.Infof("result content is %s", result.Content)
switch resType {
case model.ResourceTypePod:
// 如果是关于pod的信息,就进入这个分支
if op == model.ResponseOperation && resID == "" && result.GetSource() == metamanager.MetaManagerModuleName {
// 如果是来自metaManager的消息,就意味着里面是pod列表,于是利用这个列表添加pod,并刷新pod状态
err := e.handlePodListFromMetaManager(content)
if err != nil {
klog.Errorf("handle podList failed: %v", err)
continue
}
e.setInitPodReady(true)
} else if op == model.ResponseOperation && resID == "" && result.GetSource() == EdgeController {
// 如果是来自EdgeController的消息,也意味着里面是pod列表,于是利用这个列表添加pod,但是不会刷新pod状态
err := e.handlePodListFromEdgeController(content)
if err != nil {
klog.Errorf("handle controllerPodList failed: %v", err)
continue
}
e.setInitPodReady(true)
} else {
// 否则,就执行其他操作,包括了pod简单的增删改
err := e.handlePod(op, content)
if err != nil {
klog.Errorf("handle pod failed: %v", err)
continue
}
}
case model.ResourceTypeConfigmap:
// 如果是关于configmap信息,就进行configmap的crud
if op != model.ResponseOperation {
err := e.handleConfigMap(op, content)
if err != nil {
klog.Errorf("handle configMap failed: %v", err)
}
} else {
klog.V(4).Infof("skip to handle configMap with type response")
continue
}
case model.ResourceTypeSecret:
// 如果是关于secret的信息,就进行secret的crud
if op != model.ResponseOperation {
err := e.handleSecret(op, content)
if err != nil {
klog.Errorf("handle secret failed: %v", err)
}
} else {
klog.V(4).Infof("skip to handle secret with type response")
continue
}
case constants.CSIResourceTypeVolume:
// 如果是volume相关的信息,就进行volume的crud操作(利用CSI接口)
klog.Infof("volume operation type: %s", op)
res, err := e.handleVolume(op, content)
if err != nil {
klog.Errorf("handle volume failed: %v", err)
} else {
resp := result.NewRespByMessage(&result, res)
// 并且把结果封装成消息,塞回context
beehiveContext.SendResp(*resp)
}
default:
klog.Errorf("resType is not pod or configmap or secret or volume: resType is %s", resType)
continue
}
}