在ActiveMQ创建Topic的监听工厂配置中添加如下代码:
// 开启订阅持久化 , 并且设置clientId
factory.setSubscriptionDurable(true);
factory.setClientId("file_ocr");
其中,clientId不允许重复;
完整配置:
package com.googosoft.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
/**
* @author xuyj
* @date 2022年08月19日 10:49
*/
@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
@Value("${activemq.queue.name}")
private String queueName;
@Value("${activemq.topic.name}")
private String topicName;
@Bean(name = "queue")
public Queue queue() {
return new ActiveMQQueue(queueName);
}
@Bean(name = "topic")
public Topic topic() {
return new ActiveMQTopic(topicName);
}
/**
* 连接工厂
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
// 定义重发机制
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setUseExponentialBackOff(Boolean.TRUE);
// 消息处理失败重新处理次数,默认为6次
policy.setMaximumRedeliveries(2);
// 重发时间间隔,默认为1秒
policy.setInitialRedeliveryDelay(1000L);
policy.setBackOffMultiplier(2);
policy.setMaximumRedeliveryDelay(1000L);
// 给连接工厂设置重发机制
activeMQConnectionFactory.setRedeliveryPolicy(policy);
activeMQConnectionFactory.setClientID("file_ocr");
return activeMQConnectionFactory;
}
@Bean
public JmsMessagingTemplate jmsMessageTemplate() {
return new JmsMessagingTemplate(connectionFactory());
}
// 在Queue模式中,对消息的监听需要对containerFactory进行配置
@Bean("queueListener")
public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
// 关闭事务
factory.setSessionTransacted(false);
// 设置手动确认,默认配置中Session是开启了事物的,即使我们设置了手动Ack也是无效的
factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(false);
return factory;
}
//在Topic模式中,对消息的监听需要对containerFactory进行配置
@Bean("topicListener")
public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
// 关闭事务
factory.setSessionTransacted(false);
// 设置手动确认,默认配置中Session是开启了事物的,即使我们设置了手动Ack也是无效的
factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true);
// 开启订阅持久化 , 并且设置clientId
factory.setSubscriptionDurable(true);
factory.setClientId("file_ocr");
return factory;
}
}