RabbitMq demo

RabbitMQ demo

生产者

配置类,定义连接工厂,template,queue,exchange,binding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
@Configuration
@Slf4j
public class RabbitConfig {
/**
* Broker: 它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能够按照指定的方式进行传输。
* Exchange: 消息交换机,它指定消息按什么规则路由到哪个队列。
* FanoutExchange: 将消息分发到所有绑定的队列,没有routing_key 概念
* HeadersExchange: 通过添加key-value 匹配
* DirectExchange:按照指定的routing-key 分发到指定的队列
* TopicExchange: 通过关键字匹配
* Queue: 消息的载体,每个消息都会被投到一个或多个队列。
* Binging: 绑定,它的作用是把Exchange和Queue按照路由规则绑定起来。
* Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
* vhost: 虚拟主机,一个broker里可以有多个vhost,用作不同用户权限的分离。
* Producer: 消息生产者,投递消息的程序
* Consumer: 消息消费者,接收消息的程序
* Channel: 消息通道,在客户端的每个连接里,可建立多个channel。
*
**/
public static final String RABBIT_QUEUE_USER = "rabbit_queue_user";
public static final String RABBIT_QUEUE_USER_A = "rabbit_queue_user_a";
public static final String RABBIT_EXCHANGE_USER = "rabbit_exchange_user";
public static final String RABBIT_ROUTING_KEY_USER = "rabbit_routing_key_user";
public static final String RABBIT_ROUTING_KEY_USER_A = "rabbit_routing_key_user_a";

@Value("${spring.rabbitmq.host}")
private String host;

@Value("${spring.rabbitmq.port}")
private int port;

@Value("${spring.rabbitmq.username}")
private String username;

@Value("${spring.rabbitmq.password}")
private String password;

@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);

return connectionFactory;
}

@Bean(name = "generalRabbitTemplate")
//@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallback());
rabbitTemplate.setConfirmCallback(confirmCallback());
return rabbitTemplate;
}

@Bean RabbitTemplate.ReturnCallback returnCallback(){
return (message, replyCode, replyText, exchange, routingKey)
-> log.error("ReturnCallback消息发送失败: {}", new String(message.getBody(), StandardCharsets.UTF_8));
}

@Bean
public RabbitTemplate.ConfirmCallback confirmCallback(){
return (correlationData , ack ,cause) -> {
log.info("回调id:{}",correlationData.getId());
if ( ack ){
log.info("消费消息成功!");
}else {
log.info("消费消息失败,原因:" + cause);
}
};
}

@Bean
public DirectExchange directExchange(){
return new DirectExchange(RabbitConfig.RABBIT_EXCHANGE_USER);
}

@Bean
public Queue queueUser(){
return new Queue(RabbitConfig.RABBIT_QUEUE_USER,true);
}

@Bean
public Queue queueUserA(){
return new Queue(RabbitConfig.RABBIT_QUEUE_USER_A,true);
}

@Bean
public Binding binding(){
return BindingBuilder.bind(queueUser()).to(directExchange()).with(RabbitConfig.RABBIT_ROUTING_KEY_USER);
}

@Bean
public Binding bindingA(){
return BindingBuilder.bind(queueUserA()).to(directExchange()).with(RabbitConfig.RABBIT_ROUTING_KEY_USER_A);
}

}

采用默认配置的生产者,sping默认配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class GeneralProducer{

private static final Logger logger = LoggerFactory.getLogger(GeneralProducer.class);

@Autowired
private RabbitTemplate generalRabbitTemplate;

public void send(String exchange,String routingKey,String content){
logger.info("发送消息:{},Exchange:{},RoutingKey :{}",content,exchange,routingKey);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-",""));
logger.info("CorrelationData: {}", JSONObject.toJSONString(correlationData));
generalRabbitTemplate.convertAndSend(exchange,routingKey,content,correlationData);
}
}

自定义消息确认和未找到Queue处理策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Component
public class Producer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{

private static final Logger logger = LoggerFactory.getLogger(Producer.class);

private RabbitTemplate rabbitTemplate;

@Autowired
public Producer(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
//配置文件默认已经有配置了
rabbitTemplate.setConfirmCallback(this::confirm);
rabbitTemplate.setReturnCallback(this::returnedMessage);
rabbitTemplate.setMandatory(true);
}

public void send(String exchange,String routingKey,String content){
logger.info("发送消息:{},Exchange:{},RoutingKey :{}",content,exchange,routingKey);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-",""));
logger.info("CorrelationData: {}", JSONObject.toJSONString(correlationData));
rabbitTemplate.convertAndSend(exchange,routingKey,content,correlationData);
}

/**
* publisher-confirms: true 消息有没有到达MQ(会返回一个ack确认码)
* @author putao
* @date 2019/11/10 14:32
**/

@Override
public void confirm ( CorrelationData correlationData , boolean ack , String cause ) {
logger.info("回调id:{}",correlationData.getId());
if ( ack ){
logger.info("消费消息成功!");
}else {
logger.info("消费消息失败,原因:" + cause);
}
}

/**
* publisher-returns: true 消息有没有找到合适的队列,主要是为了生产者和mq之间的一个确认机制,当消息到没到mq,会提供相应的回调,在项目中 RabbitSender 这个类中进行了相应的配置
* @author putao
* @date 2019/11/10 14:32
**/
@Override
public void returnedMessage ( Message message , int replyCode , String replyText , String exchange , String routingKey ) {
logger.info("消息发送失败,{}",new String(message.getBody()));
}
}

配置文件

1
2
3
4
5
6
7
spring.rabbitmq.host=
spring.rabbitmq.port=5672
spring.rabbitmq.username=
spring.rabbitmq.password=
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.virtual-host=/

消费者

配置类(消费者如无特殊业务需要,不要随意添加各种参数)

1
2
3
4
5
6
7
8
9
@Configuration
public class DirectConfig {
public static final String RABBIT_QUEUE_USER = "rabbit_queue_user_a";

@Bean
public Queue queue(){
return new Queue(RABBIT_QUEUE_USER);
}
}

消费者。可以有几种写法。
实现 ChannelAwareMessageListener 类

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class ListenerA implements ChannelAwareMessageListener {

private static final Logger logger = LoggerFactory.getLogger(ListenerA.class);

@Override
@RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER)
public void onMessage ( Message message , Channel channel ) throws Exception {
//logger.info("消费消息:{}",new String(message.getBody()));
System.out.println("Listener Demo 消费" + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
}
}

利用Rabbithandler 直接获取自定义消息

1
2
3
4
5
6
7
8
9
10
11
@Component
public class ListenerE {
private static final Logger logger = LoggerFactory.getLogger(ListenerE.class);

@RabbitHandler
@RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER)
public void hand( String messages, Message message){
//logger.info("Listener E 消费消息 {}",message);
System.out.println("Listener E 消费消息 " + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
}
}

也可不用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class Listener {

private static final Logger logger = LoggerFactory.getLogger(Listener.class);

@RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER_A)
public void listen( Message message, Channel channel ) throws IOException{
//手动ack
//long deliveryTag = message.getMessageProperties().getDeliveryTag();
//channel.basicAck(deliveryTag,true);

//logger.info("Listener 消费消息{}", JSONObject.parseObject(new String(message.getBody()), User.class));
System.out.println("Listener 消费消息: "+ message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
}
}

经过试错,发现只有一种写法是不支持的。
如下:RabbitListener 定义在类上,RabbitHandler想要获取Message对象

1
2
3
4
5
6
7
8
9
10
11
@Component
@RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER)
public class ListenerE {
private static final Logger logger = LoggerFactory.getLogger(ListenerE.class);

@RabbitHandler
public void hand(Message message){
//logger.info("Listener E 消费消息 {}",message);
System.out.println("Listener E 消费消息 " + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
}
}

注意事项

有几点值得注意:

  1. 消费者配置不要随意配置最大消费数等参数,不然会发生莫名其妙的无法消费的情况。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    spring.rabbitmq.listener.type=direct
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    spring.rabbitmq.listener.direct.consumers-per-queue=2
    spring.rabbitmq.listener.direct.prefetch=2
    spring.rabbitmq.listener.direct.missing-queues-fatal=true
    spring.rabbitmq.listener.direct.default-requeue-rejected=false
    spring.rabbitmq.listener.direct.auto-startup=false
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.prefetch=2
    spring.rabbitmq.listener.simple.concurrency=5
    spring.rabbitmq.listener.simple.max-concurrency=10
    spring.rabbitmq.listener.simple.missing-queues-fatal=true
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
    spring.rabbitmq.listener.simple.auto-startup=false
    spring.rabbitmq.listener.simple.batch-size=5
  2. 生产者可以定义消息与queue的交互策略,以及消费者的ACK回调等
  3. 本质上是要根据业务需求来定义消息的流转途径,交换机、队列、路由key等
  4. 最简单的使用方式就是类似于kafka的topic一样定义就可以

参考:Spring Boot整合RabbitMQ详细教程 https://blog.csdn.net/qq_38455201/article/details/80308771
RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer https://blog.csdn.net/yingziisme/article/details/86418580
RabbitMQ实战篇:消息确认之消费者局部确认 https://blog.csdn.net/lovelong8808/article/details/94126059
RabbitMQ传输原理、五种模式 https://www.cnblogs.com/pjjlt/p/10741963.html
RabbitMQ整合SpringBoot实战!(全) https://www.cnblogs.com/coder-programming/p/11602910.html
SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流) https://www.cnblogs.com/haixiang/p/10959551.html
springboot集成rabbitMQ(消费者篇) https://blog.csdn.net/tmeng521/article/details/90756618
rabbitMq无法消费发送的q的问题 https://www.cnblogs.com/aigeileshei/p/10286208.html
Spring Boot RabbitMQ 默认配置 https://www.cnblogs.com/1x11/p/10919687.html