应用场景
在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group、Topic以及selectorExpression(数据过滤、选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换。
引入依赖
1
2
3
4
5
6
|
<!-- RocketMq Spring Boot Starter--> < dependency > < groupId >org.apache.rocketmq</ groupId > < artifactId >rocketmq-spring-boot-starter</ artifactId > < version >2.0.4</ version > </ dependency > |
消费者代码
1
2
3
4
5
6
7
|
@RocketMQMessageListener (consumerGroup = "${rocketmq.group}" ,topic = "${rocketmq.topic}" ,selectorExpression = "${rocketmq.selectorExpression}" ) public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println( "消费到的数据为:" +s); } } |
问题排查
RocketMQMessageListener整个注解默认selectorExpression为*,表示接收当前Topic下的所有数据,如果我们想对tags进行动态配置,在使用${rocketmq.selectorExpression}表达式时会发现所有数据全被过滤了,跟踪源码(ListenerContainerConfiguration.java)发现在创建listener时selectorExpression的数据在通environment环境变量中获取对应的数据后又被覆盖了,导致整个过滤条件被变更为表达式。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
@Override public void afterSingletonsInstantiated() { // 获取所有所有使用了RocketMQMessageListener注解的bean Map<String, Object> beans = this .applicationContext.getBeansWithAnnotation(RocketMQMessageListener. class ); if (Objects.nonNull(beans)) { // 循环注册容器 beans.forEach( this ::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // 校验当前bean是否实现了RocketMQListener接口 if (!RocketMQListener. class .isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener. class .getName()); } // 获取bean上的annotation RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener. class ); // 解析group及topic,可支持表达式 String consumerGroup = this .environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this .environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = ( boolean )rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true ); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization." , consumerGroup, topic); return ; } validate(annotation); String containerBeanName = String.format( "%s_%s" , DefaultRocketMQListenerContainer. class .getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注册bean的,调用createRocketMQListenerContainer genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer. class , () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer. class ); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error( "Started container failed. {}" , container, e); throw new RuntimeException(e); } } log.info( "Register the listener to container, listenerBeanName:{}, containerBeanName:{}" , beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此处已经根据表达式将数据取出 String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此处将SelectorExpression的数据覆盖成了表达式 container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; } |
问题解决
因为ListenerContainerConfiguration类是实现了SmartInitializingSingleton接口的afterSingletonsInstantiated方法,我们可以通过反射对selectorExpression的数据在ListenerContainerConfiguration进行初始化前进行解析并赋值回去。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/** * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据 **/ @Configuration public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Autowired private StandardEnvironment environment; @Override public void afterPropertiesSet() throws Exception { Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener. class ); for (Object bean : beans.values()){ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener. class .isAssignableFrom(bean.getClass())) { continue ; } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener. class ); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField( "memberValues" ); field.setAccessible( true ); Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler); for (Map.Entry<String,Object> entry: memberValues.entrySet()) { if (Objects.nonNull(entry)){ memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); } } } } } |
初次之外,在2.1.0版本的依赖包中已经修复了此Bug,在不造成依赖冲突的前提下,建议使用2.1.0以上的版本包。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/xiaojun081004/article/details/104954802