RabbitMQ demo 生产者 配置类,定义连接工厂,template,queue,exchange,binding
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 @Configuration @Slf4j public class RabbitConfig {     /**      *  Broker: 它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能够按照指定的方式进行传输。      *  Exchange: 消息交换机,它指定消息按什么规则路由到哪个队列。      *          FanoutExchange: 将消息分发到所有绑定的队列,没有routing_key 概念      *          HeadersExchange: 通过添加key-value 匹配      *          DirectExchange:按照指定的routing-key 分发到指定的队列      *          TopicExchange: 通过关键字匹配      *  Queue: 消息的载体,每个消息都会被投到一个或多个队列。      *  Binging: 绑定,它的作用是把Exchange和Queue按照路由规则绑定起来。      *  Routing Key: 路由关键字,exchange根据这个关键字进行消息投递      *  vhost: 虚拟主机,一个broker里可以有多个vhost,用作不同用户权限的分离。      *  Producer: 消息生产者,投递消息的程序      *  Consumer: 消息消费者,接收消息的程序      *  Channel: 消息通道,在客户端的每个连接里,可建立多个channel。      *     **/     public static final String RABBIT_QUEUE_USER = "rabbit_queue_user";     public static final String RABBIT_QUEUE_USER_A = "rabbit_queue_user_a";     public static final String RABBIT_EXCHANGE_USER = "rabbit_exchange_user";     public static final String RABBIT_ROUTING_KEY_USER = "rabbit_routing_key_user";     public static final String RABBIT_ROUTING_KEY_USER_A = "rabbit_routing_key_user_a";     @Value("${spring.rabbitmq.host}")     private String host;     @Value("${spring.rabbitmq.port}")     private int port;     @Value("${spring.rabbitmq.username}")     private String username;     @Value("${spring.rabbitmq.password}")     private String password;     @Value("${spring.rabbitmq.virtual-host}")     private String virtualHost;     @Bean     public ConnectionFactory connectionFactory() {         CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);         connectionFactory.setUsername(username);         connectionFactory.setPassword(password);         connectionFactory.setVirtualHost(virtualHost);         connectionFactory.setPublisherConfirms(true);         connectionFactory.setPublisherReturns(true);         return connectionFactory;     }     @Bean(name = "generalRabbitTemplate")     //@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)     public RabbitTemplate rabbitTemplate(){         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());         rabbitTemplate.setMandatory(true);         rabbitTemplate.setReturnCallback(returnCallback());         rabbitTemplate.setConfirmCallback(confirmCallback());         return rabbitTemplate;     }     @Bean RabbitTemplate.ReturnCallback returnCallback(){         return (message, replyCode, replyText, exchange, routingKey)               -> log.error("ReturnCallback消息发送失败: {}", new String(message.getBody(), StandardCharsets.UTF_8));     }     @Bean     public RabbitTemplate.ConfirmCallback confirmCallback(){         return (correlationData , ack ,cause) -> {             log.info("回调id:{}",correlationData.getId());             if ( ack ){                 log.info("消费消息成功!");             }else {                 log.info("消费消息失败,原因:" + cause);             }         };     }     @Bean     public DirectExchange directExchange(){         return new DirectExchange(RabbitConfig.RABBIT_EXCHANGE_USER);     }     @Bean     public Queue queueUser(){         return new Queue(RabbitConfig.RABBIT_QUEUE_USER,true);     }     @Bean     public Queue queueUserA(){         return new Queue(RabbitConfig.RABBIT_QUEUE_USER_A,true);     }     @Bean     public Binding binding(){         return BindingBuilder.bind(queueUser()).to(directExchange()).with(RabbitConfig.RABBIT_ROUTING_KEY_USER);     }     @Bean     public Binding bindingA(){         return BindingBuilder.bind(queueUserA()).to(directExchange()).with(RabbitConfig.RABBIT_ROUTING_KEY_USER_A);     } } 
 
采用默认配置的生产者,sping默认配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class GeneralProducer{     private static final Logger logger = LoggerFactory.getLogger(GeneralProducer.class);     @Autowired     private RabbitTemplate generalRabbitTemplate;     public void send(String exchange,String routingKey,String content){         logger.info("发送消息:{},Exchange:{},RoutingKey :{}",content,exchange,routingKey);         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-",""));         logger.info("CorrelationData: {}", JSONObject.toJSONString(correlationData));         generalRabbitTemplate.convertAndSend(exchange,routingKey,content,correlationData);     }   } 
 
自定义消息确认和未找到Queue处理策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Component public class Producer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{     private static final Logger logger = LoggerFactory.getLogger(Producer.class);     private RabbitTemplate rabbitTemplate;     @Autowired     public Producer(RabbitTemplate rabbitTemplate){         this.rabbitTemplate = rabbitTemplate;         //配置文件默认已经有配置了         rabbitTemplate.setConfirmCallback(this::confirm);         rabbitTemplate.setReturnCallback(this::returnedMessage);         rabbitTemplate.setMandatory(true);     }     public void send(String exchange,String routingKey,String content){         logger.info("发送消息:{},Exchange:{},RoutingKey :{}",content,exchange,routingKey);         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-",""));         logger.info("CorrelationData: {}", JSONObject.toJSONString(correlationData));         rabbitTemplate.convertAndSend(exchange,routingKey,content,correlationData);     }     /**     * publisher-confirms: true 消息有没有到达MQ(会返回一个ack确认码)     * @author putao     * @date  2019/11/10 14:32     **/     @Override     public void confirm ( CorrelationData correlationData , boolean ack , String cause ) {         logger.info("回调id:{}",correlationData.getId());         if ( ack ){             logger.info("消费消息成功!");         }else {             logger.info("消费消息失败,原因:" + cause);         }     }     /**     * publisher-returns: true 消息有没有找到合适的队列,主要是为了生产者和mq之间的一个确认机制,当消息到没到mq,会提供相应的回调,在项目中 RabbitSender 这个类中进行了相应的配置     * @author putao     * @date  2019/11/10 14:32     **/     @Override     public void returnedMessage ( Message message , int replyCode , String replyText , String exchange , String routingKey ) {         logger.info("消息发送失败,{}",new String(message.getBody()));     } } 
 
配置文件
1 2 3 4 5 6 7 spring.rabbitmq.host= spring.rabbitmq.port=5672 spring.rabbitmq.username= spring.rabbitmq.password= spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/ 
 
消费者 配置类(消费者如无特殊业务需要,不要随意添加各种参数)
1 2 3 4 5 6 7 8 9 @Configuration public class DirectConfig {     public static final String RABBIT_QUEUE_USER = "rabbit_queue_user_a";     @Bean     public Queue queue(){         return new Queue(RABBIT_QUEUE_USER);     } } 
 
消费者。可以有几种写法。 实现 ChannelAwareMessageListener 类
1 2 3 4 5 6 7 8 9 10 11 12 @Component public class ListenerA implements ChannelAwareMessageListener {     private static final Logger logger = LoggerFactory.getLogger(ListenerA.class);     @Override     @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER)     public void onMessage ( Message message , Channel channel ) throws Exception {         //logger.info("消费消息:{}",new String(message.getBody()));         System.out.println("Listener Demo 消费" + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));     } } 
 
利用Rabbithandler 直接获取自定义消息
1 2 3 4 5 6 7 8 9 10 11 @Component public class ListenerE  {     private static final Logger logger = LoggerFactory.getLogger(ListenerE.class);     @RabbitHandler     @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER)     public void hand( String messages, Message message){         //logger.info("Listener E 消费消息 {}",message);         System.out.println("Listener E 消费消息 " + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));     } } 
 
也可不用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class Listener {     private static final Logger logger = LoggerFactory.getLogger(Listener.class);     @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER_A)     public void listen( Message message, Channel channel ) throws IOException{         //手动ack         //long deliveryTag = message.getMessageProperties().getDeliveryTag();         //channel.basicAck(deliveryTag,true);         //logger.info("Listener 消费消息{}", JSONObject.parseObject(new String(message.getBody()), User.class));         System.out.println("Listener 消费消息: "+ message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));     } } 
 
经过试错,发现只有一种写法是不支持的。 如下:RabbitListener 定义在类上,RabbitHandler想要获取Message对象
1 2 3 4 5 6 7 8 9 10 11 @Component @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER) public class ListenerE  {     private static final Logger logger = LoggerFactory.getLogger(ListenerE.class);     @RabbitHandler     public void hand(Message message){         //logger.info("Listener E 消费消息 {}",message);         System.out.println("Listener E 消费消息 " + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));     } } 
 
注意事项 有几点值得注意:
消费者配置不要随意配置最大消费数等参数,不然会发生莫名其妙的无法消费的情况。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring.rabbitmq.listener.type=direct     spring.rabbitmq.listener.direct.acknowledge-mode=manual     spring.rabbitmq.listener.direct.consumers-per-queue=2     spring.rabbitmq.listener.direct.prefetch=2     spring.rabbitmq.listener.direct.missing-queues-fatal=true     spring.rabbitmq.listener.direct.default-requeue-rejected=false     spring.rabbitmq.listener.direct.auto-startup=false     spring.rabbitmq.listener.simple.acknowledge-mode=manual     spring.rabbitmq.listener.simple.prefetch=2     spring.rabbitmq.listener.simple.concurrency=5     spring.rabbitmq.listener.simple.max-concurrency=10     spring.rabbitmq.listener.simple.missing-queues-fatal=true     spring.rabbitmq.listener.simple.default-requeue-rejected=false     spring.rabbitmq.listener.simple.auto-startup=false     spring.rabbitmq.listener.simple.batch-size=5 
 
生产者可以定义消息与queue的交互策略,以及消费者的ACK回调等 
本质上是要根据业务需求来定义消息的流转途径,交换机、队列、路由key等 
最简单的使用方式就是类似于kafka的topic一样定义就可以 
 
参考:Spring Boot整合RabbitMQ详细教程 https://blog.csdn.net/qq_38455201/article/details/80308771    RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer  https://blog.csdn.net/yingziisme/article/details/86418580    RabbitMQ实战篇:消息确认之消费者局部确认 https://blog.csdn.net/lovelong8808/article/details/94126059    RabbitMQ传输原理、五种模式  https://www.cnblogs.com/pjjlt/p/10741963.html    RabbitMQ整合SpringBoot实战!(全) https://www.cnblogs.com/coder-programming/p/11602910.html    SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流) https://www.cnblogs.com/haixiang/p/10959551.html    springboot集成rabbitMQ(消费者篇) https://blog.csdn.net/tmeng521/article/details/90756618    rabbitMq无法消费发送的q的问题 https://www.cnblogs.com/aigeileshei/p/10286208.html    Spring Boot RabbitMQ 默认配置 https://www.cnblogs.com/1x11/p/10919687.html