KubeEdge源码分析之CloudHub
目录
注册
-
初始化配置
hubconfig.InitConfigure(hub)
里面包含了读取认证证书。
-
初始化cloudHub对象
newCloudHub(hub.Enable)
newCloudHub方法中,声明了使用的informer,初始化了内置的消息队列,并封装进cloudHub对象中。
func newCloudHub(enable bool) *cloudHub { crdFactory := informers.GetInformersManager().GetCRDInformerFactory() // declare used informer clusterObjectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ClusterObjectSyncs() objectSyncInformer := crdFactory.Reliablesyncs().V1alpha1().ObjectSyncs() messageq := channelq.NewChannelMessageQueue(objectSyncInformer.Lister(), clusterObjectSyncInformer.Lister()) ch := &cloudHub{ enable: enable, messageq: messageq, } ch.informersSyncedFuncs = append(ch.informersSyncedFuncs, clusterObjectSyncInformer.Informer().HasSynced) ch.informersSyncedFuncs = append(ch.informersSyncedFuncs, objectSyncInformer.Informer().HasSynced) return ch }
接下来就是启动cloudHub了
等待同步完成
if !cache.WaitForCacheSync(beehiveContext.Done(), a.informersSyncedFuncs...) {
klog.Errorf("unable to sync caches for objectSyncController")
os.Exit(1)
}
分发消息到边端
// start dispatch message from the cloud to edge node
go a.messageq.DispatchMessage()
DispatchMessage函数内部是个大for循环。
然后判断是否结束。
select {
case <-beehiveContext.Done():
klog.Warning("Cloudhub channel eventqueue dispatch message loop stopped")
return
default:
}
从beehiveContext中拿消息,然后再发出去。
msg, err := beehiveContext.Receive(model.SrcCloudHub)
klog.V(4).Infof("[cloudhub] dispatchMessage to edge: %+v", msg)
if err != nil {
klog.Info("receive not Message format message")
continue
}
nodeID, err := GetNodeID(&msg)
if nodeID == "" || err != nil {
klog.Warning("node id is not found in the message")
continue
}
if isListResource(&msg) {
q.addListMessageToQueue(nodeID, &msg)
} else {
q.addMessageToQueue(nodeID, &msg)
}
那么,为什么调用addListMessageToQueue或者addMessageToQueue就会下发消息到边端呢🤔(请看下面的 启动cloudHub
)
下面,我们详细剖析一下addMessageToQueue。
func (q *ChannelMessageQueue) addMessageToQueue(nodeID string, msg *beehiveModel.Message) {
if msg.GetResourceVersion() == "" && !isDeleteMessage(msg) {
return
}
// 拿到nodeID对应的queue,从queuePool中取,如果没有就新建
nodeQueue := q.GetNodeQueue(nodeID)
// 拿到nodeID对应的store,从storePool中取,如果没有就新建
nodeStore := q.GetNodeStore(nodeID)
messageKey, err := getMsgKey(msg)
if err != nil {
klog.Errorf("fail to get message key for message: %s", msg.Header.ID)
return
}
// 如果是删除或回应操作,不进入if
if !isDeleteMessage(msg) && msg.GetOperation() != beehiveModel.ResponseOperation {
item, exist, _ := nodeStore.GetByKey(messageKey)
// 如果nodeStore中不存在这个messageKey,则将它跟数据库中的版本进行对比,如果数据库中的版本>=消息的版本,就忽略,否则下发消息。
if !exist {
resourceNamespace, _ := edgemessagelayer.GetNamespace(*msg)
resourceUID, err := GetMessageUID(*msg)
if err != nil {
klog.Errorf("fail to get message UID for message: %s", msg.Header.ID)
return
}
objectSync, err := q.objectSyncLister.ObjectSyncs(resourceNamespace).Get(synccontroller.BuildObjectSyncName(nodeID, resourceUID))
if err == nil && objectSync.Status.ObjectResourceVersion != "" && synccontroller.CompareResourceVersion(msg.GetResourceVersion(), objectSync.Status.ObjectResourceVersion) <= 0 {
return
}
}
// 如果存在,就比较一下版本,判断是否需要下发。
if exist {
msgInStore := item.(*beehiveModel.Message)
if isDeleteMessage(msgInStore) || synccontroller.CompareResourceVersion(msg.GetResourceVersion(), msgInStore.GetResourceVersion()) <= 0 {
return
}
}
}
if err := nodeStore.Add(msg); err != nil {
klog.Errorf("fail to add message %v nodeStore, err: %v", msg, err)
return
}
nodeQueue.Add(messageKey)
}
准备证书
// check whether the certificates exist in the local directory,
// and then check whether certificates exist in the secret, generate if they don't exist
if err := httpserver.PrepareAllCerts(); err != nil {
klog.Exit(err)
}
// TODO: Will improve in the future
DoneTLSTunnelCerts <- true
close(DoneTLSTunnelCerts)
检查ca是否存在,如果不存在则创建,然后生成token
// generate Token
if err := httpserver.GenerateToken(); err != nil {
klog.Exit(err)
}
启动HTTP服务器
// HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()
这里http服务器主要用来解决边缘ca验证的问题。
启动cloudHub
servers.StartCloudHub(a.messageq)
这里的StartCloudHub非常关键。
// StartCloudHub starts the cloud hub service
func StartCloudHub(messageq *channelq.ChannelMessageQueue) {
handler.InitHandler(messageq)
// start websocket server
if hubconfig.Config.WebSocket.Enable {
go startWebsocketServer()
}
// start quic server
if hubconfig.Config.Quic.Enable {
go startQuicServer()
}
}
首先是初始化了消息队列的handler。
// InitHandler create a handler for websocket and quic servers
func InitHandler(eventq *channelq.ChannelMessageQueue) {
once.Do(func() {
CloudhubHandler = &MessageHandle{
KeepaliveInterval: int(hubconfig.Config.KeepaliveInterval),
WriteTimeout: int(hubconfig.Config.WriteTimeout),
MessageQueue: eventq,
NodeLimit: hubconfig.Config.NodeLimit,
crdClient: client.GetCRDClient(),
}
CloudhubHandler.Handlers = []HandleFunc{
CloudhubHandler.KeepaliveCheckLoop,
CloudhubHandler.MessageWriteLoop,
CloudhubHandler.ListMessageWriteLoop,
}
CloudhubHandler.initServerEntries()
})
}
这里创建了三个loop,分别是
-
KeepaliveCheckLoop
用于检查边缘节点是否存活
// KeepaliveCheckLoop checks whether the edge node is still alive func (mh *MessageHandle) KeepaliveCheckLoop(info *model.HubInfo, stopServe chan ExitCode) { keepaliveTicker := time.NewTimer(time.Duration(mh.KeepaliveInterval) * time.Second) nodeKeepaliveChannel, ok := mh.KeepaliveChannel.Load(info.NodeID) if !ok { klog.Errorf("fail to load node %s", info.NodeID) return } for { select { // 如果收到keepalive信号,就进入第一个分支,然后重置计时器;否则,如果在一个keepalive时间段中没有收到,就会进入第二个分支,被任务节点已经挂了。 case _, ok := <-nodeKeepaliveChannel.(chan struct{}): if !ok { klog.Warningf("Stop keepalive check for node: %s", info.NodeID) return } // Reset is called after Stop or expired timer if !keepaliveTicker.Stop() { select { case <-keepaliveTicker.C: default: } } klog.V(4).Infof("Node %s is still alive", info.NodeID) keepaliveTicker.Reset(time.Duration(mh.KeepaliveInterval) * time.Second) case <-keepaliveTicker.C: if conn, ok := mh.nodeConns.Load(info.NodeID); ok { klog.Warningf("Timeout to receive heart beat from edge node %s for project %s", info.NodeID, info.ProjectID) if err := conn.(hubio.CloudHubIO).Close(); err != nil { klog.Errorf("failed to close connection %v, err is %v", conn, err) } mh.nodeConns.Delete(info.NodeID) } } } }
-
MessageWriteLoop
用于发消息,loop会先从nodeQueue里面拿到要发送的消息的key,然后根据key从nodeStore里面拿到具体的消息,然后发出去。
这样一来,其他地方只要把消息放入nodeQueue和nodeStore中即可,且是异步的。
-
ListMessageWriteLoop
MessageWriteLoop的批量下发版
然后开启了消息应答服务
CloudhubHandler.initServerEntries()
// initServerEntries register handler func
func (mh *MessageHandle) initServerEntries() {
mux.Entry(mux.NewPattern("*").Op("*"), mh.HandleServer)
}
这里会注册一个handler,用于响应所有url
其中包含了
- keepalive消息,收到后将其放入nodeKeepalive channel中,交给前面的loop
- volume消息,收到后将其塞进beehiveContext,交给其他模块处理
- 。。。
最后,开启websocket或者quic服务
// start websocket server
if hubconfig.Config.WebSocket.Enable {
go startWebsocketServer()
}
// start quic server
if hubconfig.Config.Quic.Enable {
go startQuicServer()
}