首先贴出Apache rocketmq-springboot-starter源码地址

Apache/rocketmq-springboot-starter

配置

问题

在消费Object消息的时候,会出现序列化问题。

伪代码如下:

@Component
@RocketMqMessageListener(topic = "demo", selectorExpression = "obj", consumerGroup = "CG-demo-obj")
public class ObjConsumer implements RocketMqListener<Demo> {

    @Override
    public void onMessage(Demo message) {
        log.info("消费obj消息 toString: {}", message);
    }
}

报错如下:

Cannot construct instance of `java.time.LocalDateTime` (no Creators, like default construct, exist): no String-argument constructor/factory method to deserialize from String value ('xxxx')

解决方案

方案一:(不推荐)

在对象的属性上添加序列化注解

@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime localDateTime;

显而易见,这种方式可能需要修改许许多多的类...

方案二:(推荐)

看了下 rocketmq-springboot-starter的源码,starter里面结合了spring-messaging,对消息进行了转换

源码里面RocketMQMessageConverter里利用了CompositeMessageConverter来对消息进行转化,支持Stringbytejacksonfast-json四种Converter,但是在jackson converter的配置中,只是利用了 springMappingJackson2MessageConverter来支持消息的转换,但其里面并没有对LocalDateTime等的支持。

MappingJackson2MessageConverter部分代码:

private ObjectMapper initObjectMapper() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
    objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    return objectMapper;
}

这里仍然有两种处理方式,一是仿照MappingJackson2MessageConverter直接自己写一个类似CustomMappingJackson2MessageConverter,直接复制MappingJackson2MessageConverter的源代码修改initObjectMapper()方法对ObjectMapper的实现进行扩展即可,这里不多说。

还有一种就是配置MappingJackson2MessageConverterObjectMapper属性,

  1. 配置全局的ObjectMapper

        @Bean
        @ConditionalOnMissingBean
        public ObjectMapper objectMapper() {
            ObjectMapper objectMapper = new Jackson2ObjectMapperBuilder().createXmlMapper(false).build()
                    .setLocale(Locale.CHINA)
                    //去掉默认的时间戳格式
                    .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
                    // 时区
                    .setTimeZone(TimeZone.getTimeZone(ZoneId.systemDefault()))
                    // Date参数日期格式
                    .setDateFormat(new SimpleDateFormat(DatePattern.NORM_DATETIME_PATTERN, Locale.CHINA))
                    // 该特性决定parser是否允许JSON字符串包含非引号控制字符(值小于32的ASCII字符,包含制表符和换行符)。 如果该属性关闭,则如果遇到这些字符,则会抛出异常。JSON标准说明书要求所有控制符必须使用引号,因此这是一个非标准的特性
                    .configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true)
                    // 忽略不能转义的字符
                    .configure(JsonReadFeature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER.mappedFeature(), true)
                    // 在使用spring boot + jpa/hibernate,如果实体字段上加有FetchType.LAZY,并使用jackson序列化为json串时,会遇到SerializationFeature.FAIL_ON_EMPTY_BEANS异常
                    .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
                    // 忽略未知字段
                    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                    // 单引号处理
                    .configure(Feature.ALLOW_SINGLE_QUOTES, true);
            // 注册自定义模块
            objectMapper.registerModule(new JavaTimeModule())
                    .findAndRegisterModules();
            log.info("Init rocketmq objectMapper success.");
            return objectMapper;
        }
  2. RocketMqMessageConverter直接配置即可

    public RocketMqMessageConverter(ObjectMapper objectMapper) {
        List<MessageConverter> messageConverters = new ArrayList<>();
        // ByteArray Converter
        ByteArrayMessageConverter byteArrayMessageConverter = new ByteArrayMessageConverter();
        byteArrayMessageConverter.setContentTypeResolver(null);
        messageConverters.add(byteArrayMessageConverter);
        // String Converter
        messageConverters.add(new StringMessageConverter());
        // Jackson Converter
        if (JACKSON_PRESENT) {
            MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
            converter.setObjectMapper(objectMapper);
            messageConverters.add(converter);
        }
        // FastJson Converter
        if (FASTJSON_PRESENT) {
            try {
                messageConverters.add(
                    (MessageConverter) ClassUtils.forName(
                        "com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter",
                        ClassUtils.getDefaultClassLoader()).newInstance());
            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException ignored) {
            }
        }
        messageConverter = new CompositeMessageConverter(messageConverters);
    }