SpringBoot中ActiveMQ的持久化订阅

在ActiveMQ创建Topic的监听工厂配置中添加如下代码:

// 开启订阅持久化 , 并且设置clientId
factory.setSubscriptionDurable(true);
factory.setClientId("file_ocr");

其中,clientId不允许重复;

完整配置:

package com.googosoft.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

/**
 * @author xuyj
 * @date 2022年08月19日 10:49
 */
@Configuration
public class ActiveMQConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.password}")
    private String password;

    @Value("${activemq.queue.name}")
    private String queueName;

    @Value("${activemq.topic.name}")
    private String topicName;



    @Bean(name = "queue")
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }

    @Bean(name = "topic")
    public Topic topic() {
        return new ActiveMQTopic(topicName);
    }

    /**
     * 连接工厂
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
        // 定义重发机制
        RedeliveryPolicy policy = new RedeliveryPolicy();
        policy.setUseExponentialBackOff(Boolean.TRUE);
        // 消息处理失败重新处理次数,默认为6次
        policy.setMaximumRedeliveries(2);
        // 重发时间间隔,默认为1秒
        policy.setInitialRedeliveryDelay(1000L);
        policy.setBackOffMultiplier(2);
        policy.setMaximumRedeliveryDelay(1000L);
        // 给连接工厂设置重发机制
        activeMQConnectionFactory.setRedeliveryPolicy(policy);
        activeMQConnectionFactory.setClientID("file_ocr");
        return activeMQConnectionFactory;
    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate() {
        return new JmsMessagingTemplate(connectionFactory());
    }

    // 在Queue模式中,对消息的监听需要对containerFactory进行配置
    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        // 关闭事务
        factory.setSessionTransacted(false);
        // 设置手动确认,默认配置中Session是开启了事物的,即使我们设置了手动Ack也是无效的
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    //在Topic模式中,对消息的监听需要对containerFactory进行配置
    @Bean("topicListener")
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        // 关闭事务
        factory.setSessionTransacted(false);
        // 设置手动确认,默认配置中Session是开启了事物的,即使我们设置了手动Ack也是无效的
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        // 开启订阅持久化 , 并且设置clientId
        factory.setSubscriptionDurable(true);
        factory.setClientId("file_ocr");
        return factory;
    }


}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×