RabbitMQ基本信息

RabbitMQ是什么

1. 基础概念

  • 消息代理(Message Broker):RabbitMQ 作为一个中间人,负责接收、存储和转发消息,确保消息从生产者发送到消费者。

  • AMQP 协议:高级消息队列协议(Advanced Message Queuing Protocol)是一种消息传递协议,旨在为消息的中间传递提供标准化的接口和功能。

2. 主要组件

  • 生产者(Producer):发送消息到 RabbitMQ 的应用程序。

  • 消费者(Consumer):从 RabbitMQ 接收并处理消息的应用程序。

  • 交换机(Exchange):负责接收生产者发送的消息,并根据一定的路由规则将消息转发到绑定的队列。

  • 队列(Queue):存储消息,等待消费者进行处理。

  • 绑定(Binding):连接交换机和队列的规则,定义消息如何从交换机路由到队列。

3. 消息传递模式

  • 发布/订阅(Publish/Subscribe):生产者将消息发送到交换机,交换机会根据绑定规则将消息广播到多个队列,适用于通知等场景。

  • 工作队列(Work Queues):多个消费者从同一个队列中消费消息,实现负载均衡,适用于任务分发场景。

  • 路由(Routing):根据路由键将消息发送到特定的队列,适用于不同类型的消息需要不同处理的场景。

  • 主题(Topics):消息按主题分类并路由到对应队列,适用于复杂的路由规则需求。

4. 可靠性

RabbitMQ 提供了多种机制来确保消息的可靠性:

  • 消息持久化:将消息保存到磁盘,防止数据丢失。

  • 消息确认:消费者处理完消息后,发送确认信号,确保消息被成功处理。

  • 发布确认:生产者可以确认消息是否成功发布到队列中。

5. 应用场景

RabbitMQ 适用于各种应用场景,包括但不限于:

  • 异步任务处理:如图像处理、邮件发送等需要异步执行的任务。

  • 数据流处理:如日志收集、数据分析等需要实时处理数据的应用。

  • 系统解耦:将复杂系统的各个部分通过消息队列进行解耦,提高系统的可维护性和扩展性。

6. 管理和监控

RabbitMQ 提供了丰富的管理工具和接口,包括 Web 管理界面、命令行工具和 API,方便用户监控和管理消息队列系统。

RabbitMQ可以解决什么

  • 异步处理:RabbitMQ允许系统在不同的组件之间进行异步通信。这意味着生产者可以在消息被处理之前继续执行其他任务,从而提高系统的效率和响应速度。

  • 负载均衡:RabbitMQ可以通过将消息分发到多个消费者来实现负载均衡,从而有效分配工作负载,防止单个消费者过载。

  • 可靠性:RabbitMQ提供了持久化消息的功能,确保消息即使在系统崩溃或重启后仍然存在。此外,它支持确认机制,确保消息被成功处理。

  • 解耦组件:通过使用消息队列,系统的不同部分可以解耦,这意味着它们可以独立开发、部署和扩展。这样可以提高系统的灵活性和可维护性。

  • 灵活性和可扩展性:RabbitMQ支持多种消息传递模式(如发布/订阅、工作队列、路由等),并且可以根据需要轻松扩展,满足不同的应用场景需求。

RabbitMQ使用docker安装

1.拉取docker镜像

docker pull rabbitmq

2.运行mq镜像 5673是消息队列的端口号,15672是web管理页面端口

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5673:5672 rabbitmq

这时候就可以使用http://ip:15672地址访问页面了

如果不能访问说明拉取镜像的配置文件内没有默认开启 rabbitmq_management 插件

进入docker容器内部开启这个插件

docker ps

查出docker rabbitmq的容器id为1a3a1fc451e5 使用docker 命令进入docker容器

docker exec -it 1a3a1fc451e5 /bin/bash

然后输入

rabbitmq-plugins enable rabbitmq_management

这样子就开启成功了exit退出容器使用浏览器就可以访问啦

统一编程环境

	
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>

	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.75</version>
	</dependency>

	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<!--rabbitmq-->
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.16</version>
		<scope>compile</scope>
	</dependency>

	<dependency>
		<groupId>com.google.guava</groupId>
		<artifactId>guava</artifactId>
		<version>20.0</version>
	</dependency>

	<dependency>
		<groupId>com.googlecode.json-simple</groupId>
		<artifactId>json-simple</artifactId>
		<version>1.1.1</version>
	</dependency>

jdk17

springboot

RabbitMQ工作模式demo

Helloworld模式

这是mq中最简单易上手的例子由 消费者+队列+生产者 构成

首先我们配置spring配置文件

server:
  port: 8021
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest

我们在springboot中创建队列对象Queue

RabbitMQConfig

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Bean
    public Queue helloWorldQueue() {
        return new Queue("helloWorldQueue", true);
    }
}

Producer

接下来我们创建生产者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class Producer {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    public void sendMessage(){
        rabbitTemplate.convertAndSend("helloWorldQueue","你好");
        log.error("生产者发送了消息");

    }
}

Consumer

创建消费者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class Consumer {

    @RabbitListener(queues = "helloWorldQueue")
    public void receive(String msg){
        log.error("消费者接受了{}", msg);
    }
}

创建测试类

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private Producer producer;
	@Test
	void helloWorldQueueTest() {
		producer.sendMessage();
	}

}

日志

控制台输出效果

work模式

work模式组成为:生产者 + 队列 + 消费者



这种模式可以很好的作为分担消费者的压力,一个队列连接多个消费者负载均衡的进行消费队列中的信息

所以我们需要三个springboot后台一个作为生产者两个作为消费者,这样子可以很好的模拟出负载均衡的环境

WorkConsumer1

首先是spring的配置文件

server:
  port: 8021
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest

然后是RabbitMQConfig文件用来创建rabbitmq中的队列

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkConfig {

    @Bean
    public Queue workTestQueue(){
        return new Queue("workQueue",true,false,false);//参数意思为 队列名字,是否持久化,是否是独占模式,使用后是否自动删除
    }
}

最后是WorkConsumer消费者的代码

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class WorkCosumemr1 {

    @RabbitListener(queues = "workQueue")
    public void receive(String msg) throws InterruptedException {
        log.error("消费者1收到=>{}",msg);
        Thread.sleep(1000);
    }
}

WorkConsumer2

首先是springboot配置文件

server:
  port: 8022
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest

因为WorkConsumer1的RabbitMQConfig文件已经声明了队列我们在这里不重复声明

接下来是WorkConsumer2第二个消费者

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class WorkCosumer2 {

    @RabbitListener(queues = "workQueue")
    public void receive(String msg) throws InterruptedException {
        log.error("消费者2收到=>{}",msg);
        Thread.sleep(100);
    }
}

可以看到我们用sleep模拟了两个消费者不同性能

WorkProducer

springboot的配置文件

server:
  port: 8023
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest

Producer文件模拟生产者发送消息

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class Producer {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法

    public void sendMessage(){
        rabbitTemplate.convertAndSend("helloWorldQueue","你好");
        log.error("生产者发送了消息");

    }
}

然后再写一个springboot测试类来启动

import live.nido.rabbitmq.work.WorkProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {
	
	@Autowired
	private WorkProducer workProducer;
	
	@Test
	void workQueueTest() throws InterruptedException {
		workProducer.send();
	}
}

我们先启动两个消费者然后再启动生产者就会有以下情况

生产者日志图

消费者1日志图

消费者2日志图

这样子我们就成功实现了图中生产者然后两个消费者,负载均衡的消费模式

细心的人就会发现虽然两个消费者的性能不同但是队列还是以公平的形式来分发消息,至于如何实现消费者能者多劳模式我将在下面的例子中完成

订阅模式

订阅模式的构成为:生产者 + 交换机 + 队列 + 消费者 构成

这种模式可以将生产者发送的信息通过交换机发送给两个队列然后两个消费者会消费一遍相同的信息

和work模式相同我们需要三个springboot后台来模拟一个生产者和两个消费者

springboot的配置文件和work模式一模一样这里就不重复了

SubscribeConsumer1

如图声明创建出两个队列和交换机然后将其绑定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SubscribeConfig {

    @Bean
    public Queue subscribeQueue(){

        return new Queue("subscribeQueue",true,false,false);//参数的意思是 队列名,是否持久化,是否独占,是否自动删除
    }
    @Bean
    public Queue subscribeQueue2(){
        return new Queue("subscribeQueue2",true,false,false);
    }
    @Bean
    public FanoutExchange subscribeExchange(){
        return new FanoutExchange("subscribeExchange",true,false);//参数的意思是 交换机名,是否持久化,是否自动删除
    }
    @Bean
    public Binding binding(Queue subscribeQueue,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(subscribeQueue).to(fanoutExchange);
    }
    @Bean
    public Binding binding2(Queue subscribeQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(subscribeQueue2).to(fanoutExchange);
    }
}

然后是消费者的代码监听subscribeQueue队列

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import lombok.extern.log4j.Log4j2;

@Log4j2
@Component
public class SubscribeConsumer {
    @RabbitListener(queues = "subscribeQueue")
    public void receive(String msg) throws InterruptedException {
        log.error("消费者1收到=>{}",msg);
        Thread.sleep(1000);
    }
}

SubscribeConsumer2

在Consumer1里面已经用config声明了所需的rabbitmq对象,所以我们这边不重复声明

消费者监听subscribeQueue2队列

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class SubscribeConsumer {
    @RabbitListener(queues = "subscribeQueue2")
    public void receive(String msg) throws InterruptedException {
        log.error("主题消费者2收到=>{}",msg);
        Thread.sleep(100);
    }
}

SubscribeProducer

在Producer里发送消息给交换机

package live.nido.rabbitmq.subscribeProducer;

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class SubscribeProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){

        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("subscribeExchange",null,i);
            log.error("主题生产者发送=>{}",i);
        }
    }
}

然后创建test类进行调用

import live.nido.rabbitmq.subscribeProducer.SubscribeProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {
	
	@Autowired
	private SubscribeProducer subscribeProducer;
	
	@Test
	void subscribeTest(){
		subscribeProducer.send();
	}
}

运行后我们就可以看到结果,生产者发送的每一条消息都被消费者1和消费者2处理了一遍

生产者日志图

消费者1日志图

消费者2日志图

路由模式

路由模式为 生产者 + 交换机 + 队列 + 消费者 虽然架构和订阅模式一模一样,但是交换机会用以不同的routingKey 转发到不同的队列上,然后被不同的消费者所消费

RoutingConfig

我们创建一个交换机和两个队列,然后交换机和队列的绑定通过不同的routingKey相连

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
public class RoutingConfig {
    @Bean
    public Queue routingQueue(){
        return new Queue("routingQueue",true,false,false); //参数为 队列每次,是否持久化,是否独占,是否自动删除
    }
    @Bean
    public Queue routingQueue2(){
        return new Queue("routingQueue2",true,false,false);
    }
    @Bean
    public DirectExchange routingExchange(){
        return new DirectExchange("routingExchange",true,true);
    }
    @Bean
    public Binding routingBindCreate(Queue routingQueue,DirectExchange routingExchange){
        return BindingBuilder.bind(routingQueue).to(routingExchange).with("create");
    }
    @Bean
    public Binding routingBindUpdate(Queue routingQueue, DirectExchange routingExchange){
        return BindingBuilder.bind(routingQueue).to(routingExchange).with("update");
    }
    @Bean
    public Binding routingBindDelete(Queue routingQueue2,DirectExchange routingExchange){
        return BindingBuilder.bind(routingQueue2).to(routingExchange).with("delete");
    }
}

RoutingConsumer

消费者1绑定的routingQueue队列使用routingKey = “create” 和 routingKey = "update" 绑定到交换机上 ,之后生产者发送 create , update的routingKey就会给消费者1处理

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class RoutingConsumer {
    @RabbitListener(queues = "routingQueue")
    public void receive(String msg) throws InterruptedException {
        log.error("路由消费者1收到=>{}",msg);
        Thread.sleep(1000);
    }
}

RoutingConsumer2

消费者2绑定的routingQueue2队列使用routingKey = "delete" 绑定到交换机上如果生产者发送delete的routingKey就会给消费者2处理

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class RoutingConsumer2 {

    @RabbitListener(queues = "routingQueue2")
    public void receive(String msg) throws InterruptedException {
        log.error("路由消费者2收到=>{}",msg);
        Thread.sleep(100);
    }
}

RoutingProducer

我们将创建三种生产者的方法用来发送不同的routingKey的消息给消息队列

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class RoutingProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCreateMessage(){
        rabbitTemplate.convertAndSend("routingExchange","create","创建信息");
        log.error("路由生产者发送了创建消息");
    }

    public void sendUpdateMessage(){
        rabbitTemplate.convertAndSend("routingExchange","update","更新信息");
        log.error("路由生产者发送了更新消息");
    }

    public void sendDeleteMessage(){
        rabbitTemplate.convertAndSend("routingExchange","delete","删除信息");
        log.error("路由生产者发送了删除消息");
    }
}

我们需要编写一个测试类来发送消息

import live.nido.rabbitmq.routing.RoutingProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private RoutingProducer routingProducer;

    @Test
    void routingTest(){
       for (int i = 0; i < 30; i++) {
          routingProducer.sendDeleteMessage();
          routingProducer.sendCreateMessage();
          routingProducer.sendUpdateMessage();
       }
    }
}

点击运行后就会有这种效果

生产者日志图

消费者1日志图

消费者2日志图

主题模式

主题模式的构成为:生产者 + 交换机 + 队列 + 消费者

主题模式看起来又和路由模式的组成大差不差,主要的差别就是routingKey的构成是可以用通配符所代替的

TopicConfig

创建两个队列和一个交换机,使用routingKey和通配符将交换机和队列绑定

* : create.*的意思是只能匹配一个单词,即一个点分隔的部分  例如create.id就可以匹配,create.id.1 就不会匹配了
# : create.#的意思是可以匹配零个或多个单词,不受数量限制。例如create.id可以匹配,create.id.1也可以匹配
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicConfig {

    @Bean
    public Queue topicQueue(){
        return new Queue("topicQueue",true,false,true); //参数为 队列名称,是否持久化,是否独占连接关闭时队列自动删除,是否自动删除
    }

    @Bean
    public Queue topicQueue2(){
        return new Queue("topicQueue2",true,false,true);
    }

    @Bean
    public TopicExchange  topicExchange(){
        return new TopicExchange ("topicExchange",true,true);
    }

    @Bean
    public Binding topicQueueBinding(Queue topicQueue, TopicExchange  topicExchange){
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("create.#");
    }

    @Bean
    public Binding topicQueue2Binding(Queue topicQueue2,TopicExchange  topicExchange){
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("update.*.*");
    }
}

TopicConsumer

TopicConsumer 绑定的是topicQueue的队列,只会处理create开头后面更 . 然后任意字符。例如create.123 或者create.123.123都会处理

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class TopicConsumer {
    @RabbitListener(queues = "topicQueue")
    public void receive(String msg) throws InterruptedException {
        log.error("主题消费者1收到=>{}",msg);
        Thread.sleep(1000);
    }
}

TopicConsumer2

TopicConsumer 绑定的是topicQueue2的队列,指挥处理update.任意字符.任意字符 这种格式的routingKey update.123 这种格式不会处理 update.123.123.123 这种也不会处理

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class TopicConsumer2 {
    @RabbitListener(queues = "topicQueue2")
    public void receive(String msg) throws InterruptedException {
        log.error("主题消费者2收到=>{}",msg);
        Thread.sleep(100);
    }
}

TopicProducer

我们构建了三个发送信息的方法,发送了三个不同routingKey的信息。按照前面创建的队列应该会create.create.1被topicQueue处理,update.update.1被topicQueue2处理,而update.2这个信息不被处理

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class TopicProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCreateMessage(){
        rabbitTemplate.convertAndSend("topicExchange","create.create.1","创建信息");
        log.error("主题生产者发送了创建消息");
    }

    public void sendUpdateMessage(){
        rabbitTemplate.convertAndSend("topicExchange","update.update.1","修改信息1");
        log.error("主题生产者发送了修改消息1");
    }

    public void sendUpdateMessage2(){
        rabbitTemplate.convertAndSend("topicExchange","update.2","修改信息2");
        log.error("主题生产者发送了修改信息2");
    }
}

接下来创建一个测试类调用producer方法

import live.nido.rabbitmq.topic.TopicProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private TopicProducer topicProducer;

	@Test
	void routingTest(){
		for (int i = 0; i < 30; i++) {
			topicProducer.sendUpdateMessage2();
			topicProducer.sendCreateMessage();
			topicProducer.sendUpdateMessage();
		}
	}
}

生产者日志

消费者1日志

消费者2日志

RabbitMQ 进阶问题

当我们学习Work模式的时候就预留了一个问题,当两个消费者的性能不一致时队列还是按照一人一个的公平轮询方式分发消息,这样子就会面临一个问题性能慢的那个消息队列消息积压严重,而性能好的消费者就处于空闲状态,我们接下来专注处理这个问题,让队列处于按照消费者能力越大责任越大的模式分发消息

能力越大责任越大分发模式(队列接收消费者确认信息)

这种模式是控制箭头所指的流程

这种模式是利用消费者处理完毕返回信息给队列,然后队列收到消费者消费完毕的信息之后继续向消费者投递信息。消费者信息有三种
ack:代表消费者已经成功处理完毕信息

nack:代表消费者处理信息时候异常,需要返回nack告诉队列消息处理信息失败,需要队列再次投递

reject:代表消费者处理信息时候异常,告诉队列将该信息从队列里面剔除不再发送给队列进行处理

在确认消费者是否处理信息时有三种确认消费模式

none:意思是队列将信息发送给消费者时立即确认返回ack信息

manual:意思是队列将信息发送给消费者时,等待消费者手动确认处理完毕返回ack信息

auto:意思是spring利用aop机制自动处理消费者返回ack,nack,reject 三种信息,如果是消费者出现业务异常返回nack,如果是消息校验等问题返回reject信息

ManualAckCosummer的yml配置文件

prefetch:1 的意思是队列一次只发送一条消息,消费者确认完毕后再发送一条消息。如果没有这个配置选项的话,队列会将队列里的所有请求全部分发给消费者。在springboot中消息会存储在springboot的jvm内存中

server:
  port: 8021
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest
    listener:
      simple:
        # 公平分发
        prefetch: 1

ManualAckConfig

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ManualAckConfig {
    @Bean
    public Queue manualAckQueue(){
        return new Queue("manualAckQueue",true,false,true);//参数意思为 队列名字,是否是独占模式,是否持久化,使用后是否自动删除
    }
}

ManualAckConsumer

import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Log4j2
@Component
public class ManualAckConsumer {
    @RabbitListener(queues = "manualAckQueue",ackMode = "MANUAL")//ackMode选择的是手动确认返回机制
    public void receive(String msg, Message message, Channel channel) throws InterruptedException, IOException {
        log.error("低性能消费者1收到=>{}",msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        Thread.sleep(1000);
    }
}

ManualAckCosumer2的yaml配置文件

server:
  port: 8022
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest
    listener:
      simple:
        # 公平分发
        prefetch: 1

ManualAckConsumer2

import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Log4j2
@Component
public class ManualAckConsumer2 {
    @RabbitListener(queues = "manualAckQueue",ackMode = "MANUAL")
    public void receive(String msg, Message message, Channel channel) throws InterruptedException, IOException {
        log.error("高性能消费者2收到=>{}",msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//deliveryTag 是消息的唯一标识符,每条消息在队列中都有一个与其关联的。
        Thread.sleep(100);
    }
}

ManualAckProducer的yaml文件

server:
  port: 8023
spring:
  application:
    name: rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5673
    username: guest
    password: guest

ManualAckProducer

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class ManualAckProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(){
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("manualAckQueue",i);
            log.error("生产者发送=>{}",i);
        }
    }
}
import live.nido.rabbitmq.manualAckProducer.ManualAckProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private ManualAckProducer manualAckProducer;

	@Test
	public void send(){
		manualAckProducer.send();
	}
}

生产者日志

低性能消费者1日志

高性能消费者2日志

生产者和队列确认机制

这种机制是控制红色箭头所指的步骤

生产者和队列的确认机制有两种:1、是生产者确认与队列连接,若没有连接则重新连接。2、是生产者确认消息发送到队列当中,如果没有发送成功则重新发送

生产者重连机制

因为网络波动导致生产者无法连接到我们的rabbitmq可以用配置文件进行重新连接

connection-timeout: 1s 的意思是如果生产者连接rabbitmq花费的时间超过1秒则重新连接

retry:

enabled: true 的意思是开启生产者重连rabbitmq机制

initial-interval: 1000ms 的意思是失败后等多长时间再次进行重连

multiplier: 2 的意思是失败后下次等待时间的倍数

max-attempts: 3 的意思是最大重试次数

例如以下配置如果我的生产者超过一秒未连接到rabbitmq然后将等待1秒进行重新连接,如果第二次还是连接失败的话需要等待两秒进行连接,如果还是失败就等待4秒进行连接,如果还失败就不进行连接了

spring:
  rabbitmq:
    addresses: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 1s #设置MQ的连接超时时间
    template:
      retry:
        max-attempts: 3 #最大重试次数
        enabled: true #开启重试机制
        multiplier: 2 #失败后下次等待时长倍数,下次等待时长=initial-interval * multiplier
        initial-interval: 1000ms # 失败后的初始等待时间

生产者重连日志

更具下方框选的时间可以清楚的看到重连的时间从47到49到52

生产者确认机制

Publisher Confirm开启后RabbitMQ成功接收到消息后返回确认消息给到生产者,返回的类型为两种:

ack:生产者的消息投递到了消息队列那么消息队列就会传送一个ack给生产者。

消息投放到了队列但是路由失败,队列还是会返回ack,但是如果开启了Publisher Return机制也会返回路由异常的原因信息

如果是非持久化消息队列,只要消息入队了就会返回ack给生产者

如果是持久化消息队列,消息需要写入磁盘完成持久化才会返回ack

nack:如果消息没有成功入队则返回nack给到生产者

消息转换器

我们使用rabbitmq进行监听的时候,发现message只能传输String在有些场景下使用十分不方便,用rabbitmq中MessageConvert进行消息转换

我们实现MessageConverter这个接口的三个个方法

Json2UserMessageConverter


import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;

@Component
public class Json2UserMessageConverter implements MessageConverter {

    private static final Gson GSON = new Gson();

    /**
     * 将发送的java对象转换为消息对象
     * @param o
     * @param messageProperties
     * @return
     * @throws MessageConversionException
     */
    @Override
    public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(GSON.toJson(o).getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 将发送的java对象的类型转换成消息对象
     * @param object
     * @param messageProperties
     * @param genericType
     * @return
     * @throws MessageConversionException
     */
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties, Type genericType) throws MessageConversionException {
        return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 将发送来的消息对象转换成java对象
     * @param message
     * @return
     * @throws MessageConversionException
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class);
    }
}

再将我们的自定义的Convert绑定到rabbitTemplate里面,为了避免循环依赖我们使用connectionFactory new出一个rabbitTemplate

MessageConvertConfig


import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MessageConvertConfig {

    private ApplicationContext applicationContext;
    @Bean
    public Queue messageConvertQueue(){
        return new Queue("messageConvertQueue",true,false,true);
    }




    @Bean
    public RabbitTemplate rabbitTemplate(Json2UserMessageConverter json2UserMessageConverter, ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(json2UserMessageConverter);
        return rabbitTemplate;
    }
}

MessageConvertConsumer

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class MessageConvertConsumer {
    @RabbitListener(queues = "messageConvertQueue")
    public void receive(User user){
        log.error("消费者收到userName=>{}",user.getName());
        log.error("消费者收到userAge=>{}",user.getAge());
    }

}

这是我们需要转换的对象结构

User

@Data
public class User {
    private String name;
    private String age;
}

MessageConvertProducer

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Log4j2
@Component
public class MessageConvertProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        User user = new User();
        user.setName("张三");
        user.setAge("11");
        rabbitTemplate.convertAndSend("messageConvertQueue",user);
        log.error("生产者发送了userName=>{}",user.getName());
        log.error("生产者发送了userAge=>{}",user.getAge());
    }
}

我们使用一个test类来出发生产者的发送消息

import live.nido.rabbitmq.messageConvert.MessageConvertProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private MessageConvertProducer messageConvertProducer;

	@Test
	public void send(){
		messageConvertProducer.send();
	}

}

生产者日志

消费者日志

我们可以看到消费者和生产者可以成功的发送user对象了

Rabbitmq中队列的参数

我们看过了这么多例子我们系统性的学习一下rabbitmq队列可以设置的参数

1. x-message-ttl: 消息的存活时间

  • 描述:指定队列中消息的存活时间,超时的消息将被删除。

  • 单位:毫秒(ms)

  • 示例args.put("x-message-ttl", 60000); // 设置消息 TTL 为 60 秒。

2. x-expires: 队列的过期时间

  • 描述:指定队列的过期时间,队列在指定时间内未被访问则会自动删除。

  • 单位:毫秒(ms)

  • 示例args.put("x-expires", 3600000); // 设置队列过期时间为 1 小时。

3. x-max-length: 队列的最大长度

  • 描述:指定队列中可以存储的最大消息数量。超出此数量的消息会被丢弃(通常是旧消息)或转发到其他队列(如果配置了死信交换机)。

  • 示例args.put("x-max-length", 1000); // 设置队列最大长度为 1000 条消息。

4. x-max-length-bytes: 队列的最大字节长度

  • 描述:指定队列中可以存储的最大字节数。超出此大小的消息会被丢弃或转发到其他队列。

  • 示例args.put("x-max-length-bytes", 10485760); // 设置队列最大字节长度为 10 MB。

5. x-dead-letter-exchange: 死信交换机

  • 描述:指定消息被拒绝或超时后,将转发到哪个交换机作为死信交换机。

  • 示例args.put("x-dead-letter-exchange", "dlx.exchange"); // 设置死信交换机。

6. x-dead-letter-routing-key: 死信路由键

  • 描述:指定死信交换机使用的路由键。

  • 示例args.put("x-dead-letter-routing-key", "dlx.routingkey"); // 设置死信路由键。

7. x-max-priority: 消息的最大优先级

  • 描述:指定队列支持的最大消息优先级。优先级的范围是从 0 到 x-max-priority,消息将根据优先级排队。

  • 示例args.put("x-max-priority", 10); // 设置最大优先级为 10。

8. durable: 队列持久性

  • 描述:指定队列是否持久化。当 RabbitMQ 重启时,持久化的队列会被保留下来。

  • 示例new Queue("my_queue", true); // 创建一个持久化队列。

9. exclusive: 排他队列

  • 描述:指定队列是否为排他队列。排他队列只能由声明它的连接使用,并且在连接关闭时自动删除。

  • 示例new Queue("my_queue", false, true, false); // 创建一个排他队列。

10. autoDelete: 自动删除

  • 描述:指定队列是否为自动删除队列。自动删除队列在最后一个消费者断开连接时会自动删除。

  • 示例new Queue("my_queue", false, false, true); // 创建一个自动删除的队列。

死信交换机

死信交换机的作用是如果消息再队列里面被消费者拒绝或者超过存在时间的过期消息,如果队列绑定了死信队列那么这些消息将被转发到死信交换机里,然后死信交换机有自己的队列然后消费者连接死信队列进行消费处理。

比如说如果有一个业务逻辑需要保证每条消息都需要被处理,如果失败的消息需要人工处理,那么可以用死信交换机,失败的消息转发到死信交换机里,然后死信队列的消费者进行通知程序员这些消息失败需要手动处理

DealConfig

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DealConfig {

    @Bean
    public DirectExchange dealExchange(){
        return new DirectExchange("dealExchange",true,true);
    }
    @Bean
    public Queue dealQueue(){
        return new Queue("dealQueue",true,false,true);
    }

    /**
     * 将普通队列和死信交换机绑定
     * @return
     */
    @Bean
    public Queue normalQueue(){
        Queue queue = new Queue("normalQueue",true,false,true);
        queue.addArgument("x-dead-letter-exchange","dealExchange");
        queue.addArgument("x-dead-letter-routing-key","dealKey");
        return queue;
    }
    /**
     * 将死信交换机和死信队列绑定
     */
    @Bean
    public Binding dealBind(DirectExchange dealExchange,Queue dealQueue){
        return BindingBuilder.bind(dealQueue).to(dealExchange).with("dealKey");
    }
}

DealConsumer

声明两个消费者一个进行处理主业务,如果失败则拒绝消息。

一个进行处理死信队列里的失败消息

import com.rabbitmq.client.Channel;
import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Log4j2
@Component
public class DealConsumer {

    @RabbitListener(queues = "normalQueue" ,ackMode = "MANUAL")
    public void receive(String msg, Message message, Channel channel) throws IOException, InterruptedException {
        log.error("消费者拒绝=>{}",msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//第一个参数为该消息对应队列的唯一值,
        // 第二个是是否批量拒绝该消息如果为true(拒绝所有 deliveryTag 小于等于指定值的消息。这表示你希望批量处理消息,即拒绝从消息队列中获得的所有消息直到 deliveryTag 为止。)
        // 第三个是表示是否重新入队
        Thread.sleep(1000);
    }
    @RabbitListener(queues = "dealQueue")
    public void dealReceive(String msg){
        log.error("{}消息出错",msg);
    }
}

DealProducer

import lombok.extern.log4j.Log4j2;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Log4j2
@Component
public class DealProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(){
        log.error("发送失败的消息");
        rabbitTemplate.convertAndSend("normalQueue","哈哈哈哈哈哈哈哈哈哈哈哈");
    }
}

接下来我们需要使用test执行send

import live.nido.rabbitmq.deal.DealProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqApplicationTests {

	@Autowired
	private DealProducer dealProducer;

	@Test
	public void send(){
		dealProducer.send();
	}

}

生产者日志

消费者日志

基本上RabbitMQ的所有用法都介绍完毕了,如果有补充或者问题欢迎留言