目录

ShardingSphere-Proxy启动和分布式治理接入机制初探

目录

start.sh

先看看官方提供的启动脚本start.sh

#!/bin/bash

SERVER_NAME=ShardingSphere-Proxy

cd `dirname $0`  #进入当前Shell程序的目录
cd ..
DEPLOY_DIR=`pwd` #根目录

LOGS_DIR=${DEPLOY_DIR}/logs  #定义日志目录
if [ ! -d ${LOGS_DIR} ]; then
    mkdir ${LOGS_DIR}
fi

STDOUT_FILE=${LOGS_DIR}/stdout.log
EXT_LIB=${DEPLOY_DIR}/ext-lib  #拓展类存放目录(可选)

CLASS_PATH=.:${DEPLOY_DIR}/lib/*:${EXT_LIB}/*

JAVA_OPTS=" -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true "

JAVA_MEM_OPTS=" -server -Xmx2g -Xms2g -Xmn1g -Xss256k -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 "

MAIN_CLASS=org.apache.shardingsphere.proxy.Bootstrap  #定义启动类

print_usage() {
    echo "usage: start.sh [port] [config_dir]"
    echo "  port: proxy listen port, default is 3307"
    echo "  config_dir: proxy config directory, default is conf"
    exit 0
}

if [ "$1" == "-h" ] || [ "$1" == "--help" ] ; then
    print_usage
fi

echo "Starting the $SERVER_NAME ..."

if [ $# == 1 ]; then  #一个参数表示指定端口
    MAIN_CLASS=${MAIN_CLASS}" "$1
    echo "The port is $1"
    set CLASS_PATH=../conf;%CLASS_PATH%  #把配置目录也加入CLASS_PATH
fi

if [ $# == 2 ]; then  #第二个参数指定配置文件
    MAIN_CLASS=${MAIN_CLASS}" "$1" "$2
    echo "The port is $1"
    echo "The configuration path is $DEPLOY_DIR/$2"
    CLASS_PATH=${DEPLOY_DIR}/$2:${CLASS_PATH}  #把配置目录也加入CLASS_PATH
fi

echo "The classpath is ${CLASS_PATH}"

nohup java ${JAVA_OPTS} ${JAVA_MEM_OPTS} -classpath ${CLASS_PATH} ${MAIN_CLASS} >> ${STDOUT_FILE} 2>&1 &
sleep 1
echo "Please check the STDOUT file: $STDOUT_FILE"

这里指定了启动类org.apache.shardingsphere.proxy.Bootstrap

Bootstrap

		public static void main(final String[] args) throws IOException, SQLException {
  			// 这里BootstrapArguments会设置好端口和配置文件目录,若无指定则为默认
        BootstrapArguments bootstrapArgs = new BootstrapArguments(args);
  			// 具体如何加载yaml见下面ProxyConfigurationLoader.load方法
        YamlProxyConfiguration yamlConfig = ProxyConfigurationLoader.load(bootstrapArgs.getConfigurationPath());
      	// 重点在init方法
        createBootstrapInitializer(yamlConfig).init(yamlConfig, bootstrapArgs.getPort());
    }
    
		// 根据配置中是否有governance来决定使用哪个BootstrapInitializer
    private static BootstrapInitializer createBootstrapInitializer(final YamlProxyConfiguration yamlConfig) {
        return null == yamlConfig.getServerConfiguration().getGovernance() ? new StandardBootstrapInitializer() : new GovernanceBootstrapInitializer();
    }

ProxyConfigurationLoader

load()

		public static YamlProxyConfiguration load(final String path) throws IOException {
      	// 加载server.yml文件
        YamlProxyServerConfiguration serverConfig = loadServerConfiguration(getResourceFile(String.join("/", path, SERVER_CONFIG_FILE)));
      	File configPath = getResourceFile(path);
      	// 把各种配置类加载进来(先进行一定检查)
        Collection<YamlProxyRuleConfiguration> ruleConfigurations = loadRuleConfigurations(configPath);
      	// 如果没有配置类,或者没有治理模块的配置类则抛出异常
        Preconditions.checkState(!ruleConfigurations.isEmpty() || null != serverConfig.getGovernance(), "Can not find any valid rule configurations file in path `%s`.", configPath.getPath());
        return new YamlProxyConfiguration(serverConfig, ruleConfigurations.stream().collect(Collectors.toMap(
                YamlProxyRuleConfiguration::getSchemaName, each -> each, (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
    }

getResourceFile()

		private static File getResourceFile(final String path) {
        URL url = ProxyConfigurationLoader.class.getResource(path);
        return null == url ? new File(path) : new File(url.getFile());
    }

loadServerConfiguration()

    private static YamlProxyServerConfiguration loadServerConfiguration(final File yamlFile) throws IOException {
      	// YamlEngine通过org.yaml.snakeyaml.Yaml把yaml文件加载进来
      	// 这个地方要关注一下YamlProxyServerConfiguration类,后面的task应该要用到
        YamlProxyServerConfiguration result = YamlEngine.unmarshal(yamlFile, YamlProxyServerConfiguration.class);
        Preconditions.checkNotNull(result, "Server configuration file `%s` is invalid.", yamlFile.getName());
        Preconditions.checkState(!result.getUsers().isEmpty() || null != result.getGovernance(), "Authority configuration is invalid.");
        return result;
    }

下面关注一下init方法

因为任务需要,这里只关注GovernanceBootstrapInitializer

下面的代码来自AbstractBootstrapInitializer

@Override
public final void init(final YamlProxyConfiguration yamlConfig, final int port) throws SQLException {
  	// 这里调用的getProxyConfiguration由子类实现
    ProxyConfiguration proxyConfig = getProxyConfiguration(yamlConfig);
  	// TO LOOK
    MetaDataContexts metaDataContexts = decorateMetaDataContexts(createMetaDataContexts(proxyConfig));
    for (MetaDataAwareEventSubscriber each : ShardingSphereServiceLoader.getSingletonServiceInstances(MetaDataAwareEventSubscriber.class)) {
        each.setMetaDataContexts(metaDataContexts);
        ShardingSphereEventBus.getInstance().register(each);
    }
    String xaTransactionMangerType = metaDataContexts.getProps().getValue(ConfigurationPropertyKey.XA_TRANSACTION_MANAGER_TYPE);
    TransactionContexts transactionContexts = decorateTransactionContexts(createTransactionContexts(metaDataContexts), xaTransactionMangerType);
    ProxyContext.getInstance().init(metaDataContexts, transactionContexts);
    setDatabaseServerInfo();
    initScalingWorker(yamlConfig);
    shardingSphereProxy.start(port);
}

下面是上面调用的GovernanceBootstrapInitializer.getProxyConfiguration()

private final GovernanceFacade governanceFacade = new GovernanceFacade();

@Override
protected ProxyConfiguration getProxyConfiguration(final YamlProxyConfiguration yamlConfig) {
    // 这里就是对governanceFacade进行了初始化
  	governanceFacade.init(new GovernanceConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getGovernance()), yamlConfig.getRuleConfigurations().keySet());
    // 这里是重点,会与governance模块进行交互
  	initConfigurations(yamlConfig);
    return loadProxyConfiguration();
}

下面是GovernanceBootstrapInitializer.initConfigurations方法

private void initConfigurations(final YamlProxyConfiguration yamlConfig) {
    YamlProxyServerConfiguration serverConfig = yamlConfig.getServerConfiguration();
    Map<String, YamlProxyRuleConfiguration> ruleConfigs = yamlConfig.getRuleConfigurations();
    // 如果规则配置为空,且服务配置的用户、props都为空
  	if (isEmptyLocalConfiguration(serverConfig, ruleConfigs)) {
        governanceFacade.onlineInstance();
    } else { // 否则
        governanceFacade.onlineInstance(getDataSourceConfigurationMap(ruleConfigs), 
                getRuleConfigurations(ruleConfigs), YamlUsersConfigurationConverter.convertShardingSphereUser(serverConfig.getUsers()), serverConfig.getProps());
    }
}

划重点:GovernanceFacade.onlineInstance

onlineInstace方法有两个

public void onlineInstance(final Map<String, Map<String, DataSourceConfiguration>> dataSourceConfigMap,
                           final Map<String, Collection<RuleConfiguration>> schemaRuleMap, final Collection<ShardingSphereUser> users, final Properties props) {
  	// 将users和props写进注册中心(需要isOverwrite为true)
    registryCenter.persistGlobalConfiguration(users, props, isOverwrite);
  	// 将DataSourceConfiguration中的每一项都写入注册中心(需要isOverwrite为true)
    for (Entry<String, Map<String, DataSourceConfiguration>> entry : dataSourceConfigMap.entrySet()) {
        registryCenter.persistConfigurations(entry.getKey(), dataSourceConfigMap.get(entry.getKey()), schemaRuleMap.get(entry.getKey()), isOverwrite);
    }
    onlineInstance();
}
public void onlineInstance() {
    registryCenter.persistInstanceOnline();
    registryCenter.persistDataNodes();
    registryCenter.persistPrimaryNodes();
    listenerManager.init();
}

从这里开始进入了governance模块

RegistryCenter类

persistGlobalConfiguration方法有两个,onlineInstace首先调用的是下面这个。

public void persistGlobalConfiguration(final Collection<ShardingSphereUser> users, final Properties props, final boolean isOverwrite) {
    persistUsers(users, isOverwrite);
    persistProperties(props, isOverwrite);
}
private void persistUsers(final Collection<ShardingSphereUser> users, final boolean isOverwrite) {
    if (!users.isEmpty() && (isOverwrite || !hasUsers())) {
      	// 持久化到注册中心
        repository.persist(node.getUsersNode(), YamlEngine.marshal(YamlUsersConfigurationConverter.convertYamlUserConfigurations(users)));
    }
}

persist是由governance-repository-api模块的GovernanceRepository接口定义的一个方法,ss在governance-repository-provider的Zeekeeper和Etcd这两个类中对persist进行了实现。

下面来看看ss对zookeeper的persist的具体实现。

@Override
public void persist(final String key, final String value) {
    try {
        if (!isExisted(key)) {
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
        } else {
            update(key, value);
        }
        // CHECKSTYLE:OFF
    } catch (final Exception ex) {
        // CHECKSTYLE:ON
        CuratorZookeeperExceptionHandler.handleException(ex);
    }
}

这个地方对于zk的操作用的是CuratorFramework框架

所以如果我们要在shardingshpere中与注册中心交互的话,只需要去调用RegistryRepository接口的一系列方法即可。

registryListenerManager.initListeners

前面的onlineInstance方法最后一个是对监听事件进行初始化,主要是通过registryListenerManager.init方法进行的。

/**
 * Initialize all state changed listeners.
 */
public void initListeners() {
    terminalStateChangedListener.watch(Type.UPDATED);
    dataSourceStateChangedListener.watch(Type.UPDATED, Type.DELETED, Type.ADDED);
    lockChangedListener.watch(Type.ADDED, Type.DELETED);
    metaDataListener.watch();
    propertiesChangedListener.watch(Type.UPDATED);
    authenticationChangedListener.watch(Type.UPDATED);
    privilegeNodeChangedListener.watch(Type.UPDATED);
}

下面是PostGovernanceRepositoryEventListener抽象类的两个watch方法。

@Override
public final void watch(final Type... types) {
    Collection<Type> typeList = Arrays.asList(types);
    for (String watchKey : watchKeys) {
        watch(watchKey, typeList);
    }
}

private void watch(final String watchKey, final Collection<Type> types) {
    governanceRepository.watch(watchKey, dataChangedEvent -> {
        if (types.contains(dataChangedEvent.getType())) {
            Optional<T> event = createEvent(dataChangedEvent);
            event.ifPresent(ShardingSphereEventBus.getInstance()::post);
        }
    });
}

这里的governanceRepository.watch是由的xxxRepository实现的,下面看一看CuratorZookeeperRepository是怎么实现。

@Override
public void watch(final String key, final DataChangedEventListener listener) {
    String path = key + PATH_SEPARATOR;
    if (!caches.containsKey(path)) {
        addCacheData(key);
      	// 使用了基于Cache的事件监听
        CuratorCache cache = caches.get(path);
        cache.listenable().addListener((type, oldData, data) -> {
            String eventPath = CuratorCacheListener.Type.NODE_DELETED == type ? oldData.getPath() : data.getPath();
            byte[] eventDataByte = CuratorCacheListener.Type.NODE_DELETED == type ? oldData.getData() : data.getData();
            Type changedType = getChangedType(type);
            if (Type.IGNORED != changedType) {
              	// 这里的onChange会在时间发生的时候触发,见下面接口的注释
                listener.onChange(new DataChangedEvent(eventPath, null == eventDataByte ? null : new String(eventDataByte, StandardCharsets.UTF_8), changedType));
            }
        });
    }
}
/**
 * Listener for data changed.
 */
public interface DataChangedEventListener {
    
    /**
     * Fire when data changed.
     * 
     * @param event data changed event
     */
    void onChange(DataChangedEvent event);
}

而初始化时,对事件的处理如下(上面贴过了)

private void watch(final String watchKey, final Collection<Type> types) {
    governanceRepository.watch(watchKey, dataChangedEvent -> {
      	// Lamda实现onChange方法
        if (types.contains(dataChangedEvent.getType())) {
            Optional<T> event = createEvent(dataChangedEvent);
          	// 懒汉式单例模式创建ShardingSphereEventBus,把事件推送给订阅者
            event.ifPresent(ShardingSphereEventBus.getInstance()::post);
        }
    });
}

至此,接入服务治理的proxy的初始化完成。那么,ShardingSphereEventBus是在哪里处理的呢?

proxy对注册中心进行了监听,若有数据发生变化,则包装成相应的event类,然后post进ShardingSphereEventBus。这时,带@Subscribe注解的相应方法会被激活。RegistryCenter类中有很多被@Subscribe标记的方法,但都没有对DataChangedEvent进行处理。