KubeVela源码分析——core启动流程
按照之前的习惯,从启动的地方开始看源码,所以这篇文章将分析kubevela-core的启动流程。
定义变量
var metricsAddr, logFilePath, leaderElectionNamespace string
var enableLeaderElection, logDebug bool
...
var retryPeriod time.Duration
var enableClusterGateway bool
定义flag
flag.BoolVar(&useWebhook, "use-webhook", false, "Enable Admission Webhook")
flag.StringVar(&certDir, "webhook-cert-dir", "/k8s-webhook-server/serving-certs", "Admission webhook cert/key dir.")
...
flag.BoolVar(&controllerArgs.EnableCompatibility, "enable-asi-compatibility", false, "enable compatibility for asi")
standardcontroller.AddOptimizeFlags()
flag.Parse()
// setup logging
klog.InitFlags(nil)
if logDebug {
_ = flag.Set("v", strconv.Itoa(int(commonconfig.LogDebug)))
}
这个地方flag的作用可以参考:https://www.jianshu.com/p/a5c11a590567
开启pprof服务
if pprofAddr != "" {
// Start pprof server if enabled
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
pprofServer := http.Server{
Addr: pprofAddr,
Handler: mux,
}
klog.InfoS("Starting debug HTTP server", "addr", pprofServer.Addr)
go func() {
go func() {
ctx := context.Background()
<-ctx.Done()
ctx, cancelFunc := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancelFunc()
if err := pprofServer.Shutdown(ctx); err != nil {
klog.Error(err, "Failed to shutdown debug HTTP server")
}
}()
if err := pprofServer.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) {
klog.Error(err, "Failed to start debug HTTP server")
panic(err)
}
}()
}
pprof的作用就是性能分析,具体使用请看:https://www.jianshu.com/p/f4690622930d
输出kubevela的信息
klog.InfoS("KubeVela information", "version", version.VelaVersion, "revision", version.GitRevision)
klog.InfoS("Disable capabilities", "name", disableCaps)
klog.InfoS("Vela-Core init", "definition namespace", oam.SystemDefinitonNamespace)
初始化multicluster
// wrapper the round tripper by multi cluster rewriter
if enableClusterGateway {
if _, err := multicluster.Initialize(restConfig, true); err != nil {
klog.ErrorS(err, "failed to enable multicluster")
os.Exit(1)
}
}
这个地方涉及到了ClusterGateway,我们下一篇文章再深入分析。
生成一个controller-runtime-manager
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
LeaderElection: enableLeaderElection,
LeaderElectionNamespace: leaderElectionNamespace,
LeaderElectionID: kubevelaName,
Port: webhookPort,
CertDir: certDir,
HealthProbeBindAddress: healthAddr,
LeaderElectionResourceLock: leaderElectionResourceLock,
LeaseDuration: &leaseDuration,
RenewDeadline: &renewDeadline,
RetryPeriod: &retryPeriod,
// SyncPeriod is configured with default value, aka. 10h. First, controller-runtime does not
// recommend use it as a time trigger, instead, it is expected to work for failure tolerance
// of controller-runtime. Additionally, set this value will affect not only application
// controller but also all other controllers like definition controller. Therefore, for
// functionalities like state-keep, they should be invented in other ways.
NewClient: ctrlClient.DefaultNewControllerClient,
})
其中,ctrl#NewManager这个方法的注释写道:
// NewManager returns a new Manager for creating Controllers.
因此,我们猜测这个manager可以用来创建控制器~
健康检查
if err := registerHealthChecks(mgr); err != nil {
klog.ErrorS(err, "Unable to register ready/health checks")
os.Exit(1)
}
我们来看一下registerHealthChecks这个函数
// registerHealthChecks is used to create readiness&liveness probes
func registerHealthChecks(mgr ctrl.Manager) error {
klog.Info("Create readiness/health check")
if err := mgr.AddReadyzCheck("ping", healthz.Ping); err != nil {
return err
}
// TODO: change the health check to be different from readiness check
if err := mgr.AddHealthzCheck("ping", healthz.Ping); err != nil {
return err
}
return nil
}
可以看到,这个地方分别对readiness和health做了ping检查。
这里有个TODO,意思是需要health的检查还需要完善🤔。
检查DisabledCapabilities
if err := utils.CheckDisabledCapabilities(disableCaps); err != nil {
klog.ErrorS(err, "Unable to get enabled capabilities")
os.Exit(1)
}
进入CheckDisabledCapabilities,
// CheckDisabledCapabilities checks whether the disabled capability controllers are valid
func CheckDisabledCapabilities(disableCaps string) error {
switch disableCaps {
case common.DisableNoneCaps:
return nil
case common.DisableAllCaps:
return nil
default:
for _, c := range strings.Split(disableCaps, ",") {
if !allBuiltinCapabilities.Contains(c) {
return fmt.Errorf("%s in disable caps list is not built-in", c)
}
}
return nil
}
}
从上面的代码可以发现,disableCaps的值只能是 ""
或者 "all"
。
至于这个disableCaps是干啥用的🤔,现在从注释能看出这个跟disabled capability controllers有关。不过可以从git的记录看出来,这个是 #687 这个pr加的,我们后面有需要再看吧。
设置ApplyMode
switch strings.ToLower(applyOnceOnly) {
case "", "false", string(oamcontroller.ApplyOnceOnlyOff):
controllerArgs.ApplyMode = oamcontroller.ApplyOnceOnlyOff
klog.Info("ApplyOnceOnly is disabled")
case "true", string(oamcontroller.ApplyOnceOnlyOn):
controllerArgs.ApplyMode = oamcontroller.ApplyOnceOnlyOn
klog.Info("ApplyOnceOnly is enabled, that means workload or trait only apply once if no spec change even they are changed by others")
case string(oamcontroller.ApplyOnceOnlyForce):
controllerArgs.ApplyMode = oamcontroller.ApplyOnceOnlyForce
klog.Info("ApplyOnceOnlyForce is enabled, that means workload or trait only apply once if no spec change even they are changed or deleted by others")
default:
klog.ErrorS(fmt.Errorf("invalid apply-once-only value: %s", applyOnceOnly),
"Unable to setup the vela core controller",
"apply-once-only", "on/off/force, by default it's off")
os.Exit(1)
}
从日志里面可以看出来ApplyMode的作用。
设置DiscoveryMapper
dm, err := discoverymapper.New(mgr.GetConfig())
if err != nil {
klog.ErrorS(err, "Failed to create CRD discovery client")
os.Exit(1)
}
controllerArgs.DiscoveryMapper = dm
那么,Discovery是啥🤔
这玩意儿跟k8s的服务发现有关,DiscoveryClient可以用来拿到k8s里的资源信息,更多可以参考:https://www.jianshu.com/p/d5bc743a48a5
设置PackageDiscover
pd, err := packages.NewPackageDiscover(mgr.GetConfig())
if err != nil {
klog.Error(err, "Failed to create CRD discovery for CUE package client")
if !packages.IsCUEParseErr(err) {
os.Exit(1)
}
}
controllerArgs.PackageDiscover = pd
这个PackageDiscover应该是用来从k8s集群中加载CUE包的。
安装WebHook
if useWebhook {
klog.InfoS("Enable webhook", "server port", strconv.Itoa(webhookPort))
oamwebhook.Register(mgr, controllerArgs)
if err := waitWebhookSecretVolume(certDir, waitSecretTimeout, waitSecretInterval); err != nil {
klog.ErrorS(err, "Unable to get webhook secret")
os.Exit(1)
}
}
安装controller
if err = oamv1alpha2.Setup(mgr, controllerArgs); err != nil {
klog.ErrorS(err, "Unable to setup the oam controller")
os.Exit(1)
}
if err = standardcontroller.Setup(mgr, disableCaps, controllerArgs); err != nil {
klog.ErrorS(err, "Unable to setup the vela core controller")
os.Exit(1)
}
这里controller的安装写的好妙啊,比如下面这段代码:
for _, setup := range []func(ctrl.Manager, controller.Args) error{
application.Setup, traitdefinition.Setup, componentdefinition.Setup, policydefinition.Setup, workflowstepdefinition.Setup,
applicationconfiguration.Setup,
} {
if err := setup(mgr, args); err != nil {
return err
}
}
还能这样@_@!
设置环境变量STORAGE_DRIVER
if driver := os.Getenv(system.StorageDriverEnv); len(driver) == 0 {
// first use system environment,
err := os.Setenv(system.StorageDriverEnv, storageDriver)
if err != nil {
klog.ErrorS(err, "Unable to setup the vela core controller")
os.Exit(1)
}
}
klog.InfoS("Use storage driver", "storageDriver", os.Getenv(system.StorageDriverEnv))
启动controller manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
klog.ErrorS(err, "Failed to run manager")
os.Exit(1)
}
总结
KubeVela注释、日志很多,源码看起来很舒服。