聊聊rocketmq的ExtProducerResetConfiguration

编程

本文主要研究一下rocketmq的ExtProducerResetConfiguration

ExtProducerResetConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java

@Configuration

public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);

private ConfigurableApplicationContext applicationContext;

private StandardEnvironment environment;

private RocketMQProperties rocketMQProperties;

private ObjectMapper objectMapper;

public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,

StandardEnvironment environment,

RocketMQProperties rocketMQProperties) {

this.objectMapper = rocketMQMessageObjectMapper;

this.environment = environment;

this.rocketMQProperties = rocketMQProperties;

}

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = (ConfigurableApplicationContext) applicationContext;

}

@Override

public void afterSingletonsInstantiated() {

Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);

if (Objects.nonNull(beans)) {

beans.forEach(this::registerTemplate);

}

}

private void registerTemplate(String beanName, Object bean) {

Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);

if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {

throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());

}

ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);

GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

validate(annotation, genericApplicationContext);

DefaultMQProducer mqProducer = createProducer(annotation);

// Set instanceName same as the beanName

mqProducer.setInstanceName(beanName);

try {

mqProducer.start();

} catch (MQClientException e) {

throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",

beanName), e);

}

RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;

rocketMQTemplate.setProducer(mqProducer);

rocketMQTemplate.setObjectMapper(objectMapper);

log.info("Set real producer to :{} {}", beanName, annotation.value());

}

private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {

DefaultMQProducer producer = null;

RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();

if (producerConfig == null) {

producerConfig = new RocketMQProperties.Producer();

}

String nameServer = environment.resolvePlaceholders(annotation.nameServer());

String groupName = environment.resolvePlaceholders(annotation.group());

groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;

String ak = environment.resolvePlaceholders(annotation.accessKey());

ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();

String sk = environment.resolvePlaceholders(annotation.secretKey());

sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();

String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());

customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;

if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {

producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),

annotation.enableMsgTrace(), customizedTraceTopic);

producer.setVipChannelEnabled(false);

} else {

producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);

}

producer.setNamesrvAddr(nameServer);

producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());

producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());

producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());

producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());

producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());

producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());

return producer;

}

private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {

if (genericApplicationContext.isBeanNameInUse(annotation.value())) {

throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +

"please check the @ExtRocketMQTemplateConfiguration",

annotation.value()));

}

if (rocketMQProperties.getNameServer() == null ||

rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {

throw new BeanDefinitionValidationException(

"Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +

"global property, please use the default RocketMQTemplate!");

}

}

}

  • ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法
  • registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常
  • 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper

ExtRocketMQTemplateConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

@Documented

@Component

public @interface ExtRocketMQTemplateConfiguration {

/**

* The component name of the Producer configuration.

*/

String value() default "";

/**

* The property of "name-server".

*/

String nameServer();

/**

* Name of producer.

*/

String group() default "${rocketmq.producer.group:}";

/**

* Millis of send message timeout.

*/

int sendMessageTimeout() default -1;

/**

* Compress message body threshold, namely, message body larger than 4k will be compressed on default.

*/

int compressMessageBodyThreshold() default -1;

/**

* Maximum number of retry to perform internally before claiming sending failure in synchronous mode.

* This may potentially cause message duplication which is up to application developers to resolve.

*/

int retryTimesWhenSendFailed() default -1;

/**

* <p> Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>

* This may potentially cause message duplication which is up to application developers to resolve.

*/

int retryTimesWhenSendAsyncFailed() default -1;

/**

* Indicate whether to retry another broker on sending failure internally.

*/

boolean retryNextServer() default false;

/**

* Maximum allowed message size in bytes.

*/

int maxMessageSize() default -1;

/**

* The property of "access-key".

*/

String accessKey() default "${rocketmq.producer.accessKey:}";

/**

* The property of "secret-key".

*/

String secretKey() default "${rocketmq.producer.secretKey:}";

/**

* Switch flag instance for message trace.

*/

boolean enableMsgTrace() default true;

/**

* The name value of message trace topic.If you don"t config,you can use the default trace topic name.

*/

String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";

}

  • ExtRocketMQTemplateConfiguration注解定义了value、nameServer、group、sendMessageTimeout、compressMessageBodyThreshold、retryTimesWhenSendFailed、retryTimesWhenSendAsyncFailed、retryNextServer、maxMessageSize、accessKey、secretKey、enableMsgTrace、customizedTraceTopic属性

ExtRocketMQTemplate

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;

import org.apache.rocketmq.spring.core.RocketMQTemplate;

@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")

public class ExtRocketMQTemplate extends RocketMQTemplate {

}

  • 这里定义了ExtRocketMQTemplate类,它继承了RocketMQTemplate,同时使用了ExtRocketMQTemplateConfiguration注解

小结

  • ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法
  • registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常
  • 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper

doc

  • ExtProducerResetConfiguration
  • ExtRocketMQTemplateConfiguration
  • ExtRocketMQTemplate

以上是 聊聊rocketmq的ExtProducerResetConfiguration 的全部内容, 来源链接: utcz.com/z/510288.html

回到顶部