RabbitMQ demo 生产者 配置类,定义连接工厂,template,queue,exchange,binding
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 @Configuration @Slf4j public class RabbitConfig { /** * Broker: 它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能够按照指定的方式进行传输。 * Exchange: 消息交换机,它指定消息按什么规则路由到哪个队列。 * FanoutExchange: 将消息分发到所有绑定的队列,没有routing_key 概念 * HeadersExchange: 通过添加key-value 匹配 * DirectExchange:按照指定的routing-key 分发到指定的队列 * TopicExchange: 通过关键字匹配 * Queue: 消息的载体,每个消息都会被投到一个或多个队列。 * Binging: 绑定,它的作用是把Exchange和Queue按照路由规则绑定起来。 * Routing Key: 路由关键字,exchange根据这个关键字进行消息投递 * vhost: 虚拟主机,一个broker里可以有多个vhost,用作不同用户权限的分离。 * Producer: 消息生产者,投递消息的程序 * Consumer: 消息消费者,接收消息的程序 * Channel: 消息通道,在客户端的每个连接里,可建立多个channel。 * **/ public static final String RABBIT_QUEUE_USER = "rabbit_queue_user"; public static final String RABBIT_QUEUE_USER_A = "rabbit_queue_user_a"; public static final String RABBIT_EXCHANGE_USER = "rabbit_exchange_user"; public static final String RABBIT_ROUTING_KEY_USER = "rabbit_routing_key_user"; public static final String RABBIT_ROUTING_KEY_USER_A = "rabbit_routing_key_user_a"; @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); return connectionFactory; } @Bean(name = "generalRabbitTemplate") //@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate(){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(returnCallback()); rabbitTemplate.setConfirmCallback(confirmCallback()); return rabbitTemplate; } @Bean RabbitTemplate.ReturnCallback returnCallback(){ return (message, replyCode, replyText, exchange, routingKey) -> log.error("ReturnCallback消息发送失败: {}", new String(message.getBody(), StandardCharsets.UTF_8)); } @Bean public RabbitTemplate.ConfirmCallback confirmCallback(){ return (correlationData , ack ,cause) -> { log.info("回调id:{}",correlationData.getId()); if ( ack ){ log.info("消费消息成功!"); }else { log.info("消费消息失败,原因:" + cause); } }; } @Bean public DirectExchange directExchange(){ return new DirectExchange(RabbitConfig.RABBIT_EXCHANGE_USER); } @Bean public Queue queueUser(){ return new Queue(RabbitConfig.RABBIT_QUEUE_USER,true); } @Bean public Queue queueUserA(){ return new Queue(RabbitConfig.RABBIT_QUEUE_USER_A,true); } @Bean public Binding binding(){ return BindingBuilder.bind(queueUser()).to(directExchange()).with(RabbitConfig.RABBIT_ROUTING_KEY_USER); } @Bean public Binding bindingA(){ return BindingBuilder.bind(queueUserA()).to(directExchange()).with(RabbitConfig.RABBIT_ROUTING_KEY_USER_A); } }
采用默认配置的生产者,sping默认配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class GeneralProducer{ private static final Logger logger = LoggerFactory.getLogger(GeneralProducer.class); @Autowired private RabbitTemplate generalRabbitTemplate; public void send(String exchange,String routingKey,String content){ logger.info("发送消息:{},Exchange:{},RoutingKey :{}",content,exchange,routingKey); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-","")); logger.info("CorrelationData: {}", JSONObject.toJSONString(correlationData)); generalRabbitTemplate.convertAndSend(exchange,routingKey,content,correlationData); } }
自定义消息确认和未找到Queue处理策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Component public class Producer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{ private static final Logger logger = LoggerFactory.getLogger(Producer.class); private RabbitTemplate rabbitTemplate; @Autowired public Producer(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; //配置文件默认已经有配置了 rabbitTemplate.setConfirmCallback(this::confirm); rabbitTemplate.setReturnCallback(this::returnedMessage); rabbitTemplate.setMandatory(true); } public void send(String exchange,String routingKey,String content){ logger.info("发送消息:{},Exchange:{},RoutingKey :{}",content,exchange,routingKey); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replace("-","")); logger.info("CorrelationData: {}", JSONObject.toJSONString(correlationData)); rabbitTemplate.convertAndSend(exchange,routingKey,content,correlationData); } /** * publisher-confirms: true 消息有没有到达MQ(会返回一个ack确认码) * @author putao * @date 2019/11/10 14:32 **/ @Override public void confirm ( CorrelationData correlationData , boolean ack , String cause ) { logger.info("回调id:{}",correlationData.getId()); if ( ack ){ logger.info("消费消息成功!"); }else { logger.info("消费消息失败,原因:" + cause); } } /** * publisher-returns: true 消息有没有找到合适的队列,主要是为了生产者和mq之间的一个确认机制,当消息到没到mq,会提供相应的回调,在项目中 RabbitSender 这个类中进行了相应的配置 * @author putao * @date 2019/11/10 14:32 **/ @Override public void returnedMessage ( Message message , int replyCode , String replyText , String exchange , String routingKey ) { logger.info("消息发送失败,{}",new String(message.getBody())); } }
配置文件
1 2 3 4 5 6 7 spring.rabbitmq.host= spring.rabbitmq.port=5672 spring.rabbitmq.username= spring.rabbitmq.password= spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.virtual-host=/
消费者 配置类(消费者如无特殊业务需要,不要随意添加各种参数)
1 2 3 4 5 6 7 8 9 @Configuration public class DirectConfig { public static final String RABBIT_QUEUE_USER = "rabbit_queue_user_a"; @Bean public Queue queue(){ return new Queue(RABBIT_QUEUE_USER); } }
消费者。可以有几种写法。 实现 ChannelAwareMessageListener 类
1 2 3 4 5 6 7 8 9 10 11 12 @Component public class ListenerA implements ChannelAwareMessageListener { private static final Logger logger = LoggerFactory.getLogger(ListenerA.class); @Override @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER) public void onMessage ( Message message , Channel channel ) throws Exception { //logger.info("消费消息:{}",new String(message.getBody())); System.out.println("Listener Demo 消费" + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")); } }
利用Rabbithandler 直接获取自定义消息
1 2 3 4 5 6 7 8 9 10 11 @Component public class ListenerE { private static final Logger logger = LoggerFactory.getLogger(ListenerE.class); @RabbitHandler @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER) public void hand( String messages, Message message){ //logger.info("Listener E 消费消息 {}",message); System.out.println("Listener E 消费消息 " + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")); } }
也可不用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class Listener { private static final Logger logger = LoggerFactory.getLogger(Listener.class); @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER_A) public void listen( Message message, Channel channel ) throws IOException{ //手动ack //long deliveryTag = message.getMessageProperties().getDeliveryTag(); //channel.basicAck(deliveryTag,true); //logger.info("Listener 消费消息{}", JSONObject.parseObject(new String(message.getBody()), User.class)); System.out.println("Listener 消费消息: "+ message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")); } }
经过试错,发现只有一种写法是不支持的。 如下:RabbitListener 定义在类上,RabbitHandler想要获取Message对象
1 2 3 4 5 6 7 8 9 10 11 @Component @RabbitListener(queues = DirectConfig.RABBIT_QUEUE_USER) public class ListenerE { private static final Logger logger = LoggerFactory.getLogger(ListenerE.class); @RabbitHandler public void hand(Message message){ //logger.info("Listener E 消费消息 {}",message); System.out.println("Listener E 消费消息 " + message.getMessageProperties().getHeaders().get("spring_returned_message_correlation")); } }
注意事项 有几点值得注意:
消费者配置不要随意配置最大消费数等参数,不然会发生莫名其妙的无法消费的情况。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 spring.rabbitmq.listener.type=direct spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.direct.consumers-per-queue=2 spring.rabbitmq.listener.direct.prefetch=2 spring.rabbitmq.listener.direct.missing-queues-fatal=true spring.rabbitmq.listener.direct.default-requeue-rejected=false spring.rabbitmq.listener.direct.auto-startup=false spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=2 spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10 spring.rabbitmq.listener.simple.missing-queues-fatal=true spring.rabbitmq.listener.simple.default-requeue-rejected=false spring.rabbitmq.listener.simple.auto-startup=false spring.rabbitmq.listener.simple.batch-size=5
生产者可以定义消息与queue的交互策略,以及消费者的ACK回调等
本质上是要根据业务需求来定义消息的流转途径,交换机、队列、路由key等
最简单的使用方式就是类似于kafka的topic一样定义就可以
参考:Spring Boot整合RabbitMQ详细教程 https://blog.csdn.net/qq_38455201/article/details/80308771 RabbitMQ笔记(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer https://blog.csdn.net/yingziisme/article/details/86418580 RabbitMQ实战篇:消息确认之消费者局部确认 https://blog.csdn.net/lovelong8808/article/details/94126059 RabbitMQ传输原理、五种模式 https://www.cnblogs.com/pjjlt/p/10741963.html RabbitMQ整合SpringBoot实战!(全) https://www.cnblogs.com/coder-programming/p/11602910.html SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流) https://www.cnblogs.com/haixiang/p/10959551.html springboot集成rabbitMQ(消费者篇) https://blog.csdn.net/tmeng521/article/details/90756618 rabbitMq无法消费发送的q的问题 https://www.cnblogs.com/aigeileshei/p/10286208.html Spring Boot RabbitMQ 默认配置 https://www.cnblogs.com/1x11/p/10919687.html