目录

ShardingSphere-Agent插件机制梳理

首先fork ss,然后git clone,然后git remote add upstream xxxx,然后git pull upstream master保持本地代码与主仓库一致,然后在项目的根目录下执行mvn clean install,然后添加虚拟机参数-javaagent:xxx,即可正常使用ss支持的可观察性相关插件。

ShardingSphereAgent类是javaagent参数指定的入口,里面写了premain方法,也就是在proxy启动前运行的。

public static void premain(final String arguments, final Instrumentation instrumentation) throws IOException {
    // 1.加载配置文件,默认是在/conf/agent.yaml
    AgentConfiguration agentConfiguration = AgentConfigurationLoader.load();
    // 2.将agent配置放入注册中心
    AgentConfigurationRegistry.INSTANCE.put(agentConfiguration);
    // 3.创建PluginLoader并加载所有插件
    PluginLoader pluginLoader = createPluginLoader();
    // 4.安装AgentBuilder
    setUpAgentBuilder(instrumentation, pluginLoader);
    // 5.安装PluginBootService
    setupPluginBootService(agentConfiguration.getPlugins());
}

大致流程如下图(大图可见:https://img.jooks.cn/img/20210515110232.jpg):

https://img.jooks.cn/img/20210515110232.jpg

1.加载配置文件

这里跟上一篇文章加载配置文件类似:https://www.jooks.cn/article/55

2.将agent配置放入注册中心

这里的AgentConfigurationRegistry使用了单例模式(这种实现第一次见。。。喵啊),里面其实是一个ConcurrentHashMap,然后put把agent配置放入了map。

3.创建PluginLoader并加载所有插件

private static PluginLoader createPluginLoader() throws IOException {
    PluginLoader result = PluginLoader.getInstance();
    result.loadAllPlugins();
    return result;
}

可以发现这里的PluginLoader也是单例模式(但是实现跟AgentConfigurationRegistry不一样)。

且PluginLoader继承自ClassLoader类。

然后重点看一看result.loadAllPlugins();

public void loadAllPlugins() throws IOException {
    // 3.1 从/plugins目录下读取jar文件,包装成File类
    File[] jarFiles = AgentPathBuilder.getPluginPath().listFiles(file -> file.getName().endsWith(".jar"));
    if (null == jarFiles) {
        return;
    }
    // 3.2 创建pointMap
    Map<String, PluginInterceptorPoint> pointMap = Maps.newHashMap();
    // 3.3 将配置文件中指定不加载的插件放入一个集合。
    Set<String> ignoredPluginNames = AgentConfigurationRegistry.INSTANCE.get(AgentConfiguration.class).getIgnoredPluginNames();
    // 3.4 将File逐个包装成PluginJar,然后添加到jars列表中。
    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
        for (File each : jarFiles) {
            outputStream.reset();
            JarFile jar = new JarFile(each, true);
            jars.add(new PluginJar(jar, each));
            log.info("Loaded jar {}.", each.getName());
        }
    }
    // 3.5 通过SPI机制加载各个PluginDefinitionService,并把PluginInterceptorPoint放入pointMap
    loadPluginDefinitionServices(ignoredPluginNames, pointMap);
    // 这里build出来的map是真正immutable的,往里面put东西会报错
    interceptorPointMap = ImmutableMap.<String, PluginInterceptorPoint>builder().putAll(pointMap).build();
}
  1. 从/plugins目录下获取插件的jar文件,包装成File。当前plugins目录内容如下。

    这个目录是shardingsphere-agent-distribution模块编译后产生的,为什么会在这里出现这些jar包呢?

    因为agent-distribution模块下有一个shardingsphere-agent-binary-distribution.xml文件,这里面把agent-plugin模块下各个子模块编译好的jar包引入到这里。

https://img.jooks.cn/img/20210514130658.png

  1. 创建pointMap

    这里PluginInterceptorPoint类,其内部类Builder可以通过agent机制替换构造方法、普通方法、静态方法。

  2. 将配置文件中指定不加载的插件放入一个集合。

  3. 将File逐个包装成PluginJar,然后添加到jars列表中。

  4. 通过SPI机制加载各个PluginDefinitionService

private void loadPluginDefinitionServices(final Set<String> ignoredPluginNames, final Map<String, PluginInterceptorPoint> pointMap) {
    PluginServiceLoader.newServiceInstances(PluginDefinitionService.class)
            .stream()
            .filter(each -> ignoredPluginNames.isEmpty() || !ignoredPluginNames.contains(each.getType()))
            .forEach(each -> buildPluginInterceptorPointMap(each, pointMap));
}

这里的PluginServiceLoader.newServiceInstances(PluginDefinitionService.class)用到了Java的SPI机制,会去resources/META-INF/services目录下找到接口的全限定名org.apache.shardingsphere.agent.spi.definition.PluginDefinitionService命名的文件,这里以tracing模块的jaeger插件为例,文件内容如下图,里面指出了Service接口的具体实现类。

https://img.jooks.cn/img/20210514161851.png

这样就会把PluginDefinitionService接口的具体实现类JaegerPluginDefinitionService实例化返回。

https://img.jooks.cn/img/20210514162116.png

然后再通过filter方法把配置文件中指定ignore的PluginDefinitionService过滤掉,最后将剩下的服务安装并加入到pointMap中,详细过程如下。

private void buildPluginInterceptorPointMap(final PluginDefinitionService pluginDefinitionService, final Map<String, PluginInterceptorPoint> pointMap) {
    AbstractPluginDefinitionService definitionService = (AbstractPluginDefinitionService) pluginDefinitionService;
    // install详细见下面,返回一个PluginInterceptorPoint集合
    definitionService.install().forEach(each -> {
        String target = each.getClassNameOfTarget();
        // 如果对一个目标类定义了多个interceptor,那就把这个类的points都汇总到一起
        if (pointMap.containsKey(target)) {
            PluginInterceptorPoint pluginInterceptorPoint = pointMap.get(target);
            pluginInterceptorPoint.getConstructorPoints().addAll(each.getConstructorPoints());
            pluginInterceptorPoint.getInstanceMethodPoints().addAll(each.getInstanceMethodPoints());
            pluginInterceptorPoint.getClassStaticMethodPoints().addAll(each.getClassStaticMethodPoints());
        } else {
            pointMap.put(target, each);
        }
    });
}

这里的install方法代码如下,

public final Collection<PluginInterceptorPoint> install() {
    defineInterceptors();
    return interceptorPointMap.values().stream().map(PluginInterceptorPoint.Builder::install).collect(Collectors.toList());
}

return 语句是把interceptorPointMap给转化为一个List。

上面的defineInterceptors()方法则是一个抽象方法,由具体的PluginDefinitionService类实现,下面以JaegerPluginDefinitionService为例。

@Override
public void defineInterceptors() {
    defineInterceptor(COMMAND_EXECUTOR_TASK_ENHANCE_CLASS)
            .aroundInstanceMethod(ElementMatchers.named(COMMAND_EXECUTOR_METHOD_NAME))
            .implement(COMMAND_EXECUTOR_TASK_ADVICE_CLASS)
            .build();
    defineInterceptor(SQL_PARSER_ENGINE_ENHANCE_CLASS)
            .aroundInstanceMethod(ElementMatchers.named(SQL_PARSER_ENGINE_METHOD_NAME))
            .implement(SQL_PARSER_ENGINE_ADVICE_CLASS)
            .build();
    defineInterceptor(JDBC_EXECUTOR_CALLBACK_ENGINE_ENHANCE_CLASS)
            .aroundInstanceMethod(
                    ElementMatchers.named(JDBC_EXECUTOR_METHOD_NAME)
                            .and(ElementMatchers.takesArgument(0, ElementMatchers.named(JDBC_EXECUTOR_UNIT_ENGINE_ENHANCE_CLASS)))
            )
            .implement(JDBC_EXECUTOR_CALLBACK_ADVICE_CLASS)
            .build();
}

这个地方给三个原始方法添加了环绕处理(对方法执行前后进行了具体操作,这里还不懂Jaeger的API,暂时放一放),并把目标类名加入到了interceptorPointMap。

到了这里,所有应该加载的插件的PluginInterceptorPoint都放进了pluginLoader实例的interceptorPointMap中。

4.setUpAgentBuilder

private static void setUpAgentBuilder(final Instrumentation instrumentation, final PluginLoader pluginLoader) {
    AgentBuilder agentBuilder = new AgentBuilder.Default().with(new ByteBuddy().with(TypeValidation.ENABLED))
            .ignore(ElementMatchers.isSynthetic()).or(ElementMatchers.nameStartsWith("org.apache.shardingsphere.agent."));
    agentBuilder.type(pluginLoader.typeMatcher())
            .transform(new ShardingSphereTransformer(pluginLoader)).with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION).with(new LoggingListener()).installOn(instrumentation);
}

这里就是先创建一个忽略 Synthetic以org.apache.shardingsphere.agent.开头的类 的AgentBuilder实例。

然后把agentBuilder实例作用到之前pluginLoader实例里的interceptorPointMap中包含的类名,然后绑定Transform实例,选择RETRANSFORMATION策略,绑定监听器,绑定instrumentation。

可以发现这里最重要的是new ShardingSphereTransformer(pluginLoader),等下再看。

5.setupPluginBootService

private static void setupPluginBootService(final Map<String, PluginConfiguration> pluginConfigurationMap) {
    // 1. 启动所有插件的service
    PluginBootServiceManager.startAllServices(pluginConfigurationMap);
    // 2. 添加jvm关闭钩子,让jvm在关闭前先把service正常关闭
    Runtime.getRuntime().addShutdownHook(new Thread(PluginBootServiceManager::closeAllServices));
}
  1. 先是启动所有插件的service,代码如下,

    public static void startAllServices(final Map<String, PluginConfiguration> pluginConfigurationMap) {
        Set<String> ignoredPluginNames = AgentConfigurationRegistry.INSTANCE.get(AgentConfiguration.class).getIgnoredPluginNames();
        for (Map.Entry<String, PluginConfiguration> entry: pluginConfigurationMap.entrySet()) {
            AgentTypedSPIRegistry.getRegisteredServiceOptional(PluginBootService.class, entry.getKey()).ifPresent(pluginBootService -> {
                try {
                    if (!ignoredPluginNames.isEmpty() && ignoredPluginNames.contains(pluginBootService.getType())) {
                        return;
                    }
                    pluginBootService.start(entry.getValue());
                    // CHECKSTYLE:OFF
                } catch (final Throwable ex) {
                    // CHECKSTYLE:ON
                    log.error("Failed to start service.", ex);
                }
            });
        }
    }
    

    这里会调用没有被ignore的插件的pluginBootService的start方法。

  2. 添加jvm关闭钩子,让jvm在关闭前先把service正常关闭,代码如下,

    public static void closeAllServices() {
        AgentTypedSPIRegistry.getAllRegisteredService(PluginBootService.class).forEach(each -> {
            try {
                each.close();
                // CHECKSTYLE:OFF
            } catch (final Throwable ex) {
                // CHECKSTYLE:ON
                log.error("Failed to close service.", ex);
            }
        });
    }
    

1和2中的start和close都由具体的XxPluginBootService实现。

ShardingSphereTransformer

在setUpAgentBuilder方法里面设置了agent的transformer和listener,这里来详细看看。

ShardingSphereTransformer

@Override
public Builder<?> transform(final Builder<?> builder, final TypeDescription typeDescription, final ClassLoader classLoader, final JavaModule module) {
    if (pluginLoader.containsType(typeDescription)) {
        Builder<?> result = builder;
        result = result.defineField(EXTRA_DATA, Object.class, Opcodes.ACC_PRIVATE | Opcodes.ACC_VOLATILE)
                .implement(AdviceTargetObject.class)
                .intercept(FieldAccessor.ofField(EXTRA_DATA));
        PluginInterceptorPoint pluginInterceptorPoint = pluginLoader.loadPluginInterceptorPoint(typeDescription);
        result = interceptorConstructorPoint(typeDescription, pluginInterceptorPoint.getConstructorPoints(), result);
        result = interceptorClassStaticMethodPoint(typeDescription, pluginInterceptorPoint.getClassStaticMethodPoints(), result);
        result = interceptorInstanceMethodPoint(typeDescription, pluginInterceptorPoint.getInstanceMethodPoints(), result);
        return result;
    }
    return builder;
}

这里就是设置拦截点。