认识RabbitMQ

逆流者 2021年03月12日 42次浏览

常用命令

启动与关闭

rabbitmq-server 前台启动服务
rabbitmq-server -detached 后台启动服务
rabbitmqctl stop 停止服务

终止与启动应用

rabbitmq 进程还在,只是暂时停止队列服务

rabbitmqctl start_app 启动应用
rabbitmqctl stop_app 终止应用

用户管理

# 创建用户
rabbitmqctl add_user {username} {password}
# 删除用户
rabbitmqctl delete_user {username}
# 重置密码
rabbitmqctl change_password {username} {newpassword}
# 授予用户角色(tag)
rabbitmqctl set_user_tags {username} {tag}
# 设置用户允许访问的vhost
rabbitmqctl set_permissions -p / user_admin '.*' '.*' '.*'

RabbitMQ用户四种角色(Tag)

  • 超级管理员(administrator)
    可登陆管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  • 监控者(monitoring)
    登陆管理控制台(启用management plugin的情况下)同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等等)。

  • 策略制定者(policymarker)
    登陆管理控制台(启用management plugin的情况下)同时可以对policy进行管理,但无法查看节点的相关信息

  • 普通管理者(management)
    仅可登陆管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。

AMQP

AMQP,即Advanced Message Queuing Protocol, 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件涉及。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。Erlang中实现有Rabbitmq等。

基本概念

在这里插入图片描述

  • Producer:生产者,消息的提供者
  • Consumer:消费者,消息的使用者
  • Message:消息,程序间通信的数据
  • Queue:队列,消息存放的容器,消息先进先出
  • Vhost:虚拟主机,相当于MQ的 "数据库",用于存储队列

消息状态

Ready

消息已被送入队列,等待被消费

Unacked

  • 消息已经被消费者认领,但还未被确认“已被消费”
  • Unacked 状态下,消费者断开连接则消息回到 Ready
  • 没有确认,客户没有断开连接,则一直处于Unacked

Finished

调用basicAck()方法后,表示消息已经被消费,从队列中移除。

RabbitMQ六种工作模式

  • Hello World
  • 工作队列模式(Work queues)
  • 发布/订阅模式(Publish/Subscribe)
  • 路由模式(Routing)
  • 主题模式(Topics)
  • RPC

下面我们测试一下rabbitmq这些工作模式的使用

创建一个maven工程,引入依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>

一些工具和常量类:

工具类:RabbitUtils

public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();

    static {
        connectionFactory.setHost("127.0.0.1");
        // 5672是RabbitMQ的默认端口号
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/test");
    }

    public static Connection getConnection() {
        Connection connection = null;
        try {
            connection = connectionFactory.newConnection();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return connection;
    }
}

常量类:RabbitConstant

public class RabbitConstant {

    public static final String QUEUE_HELLOWORLD = "helloworld";
    public static final String QUEUE_SMS = "sms";
    public static final String EXCHANGE_WEATHER = "weather";
    public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
    public static final String QUEUE_BAIDU = "baidu";
    public static final String QUEUE_SINA = "sina";
    public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}

Hello World

先创建一个消费者进程:

public class Consumer {

    public static void main(String[] args) throws IOException {
        Connection conn = RabbitUtils.getConnection();
        //创建通道
        Channel channel = conn.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
        //创建一个消息消费者
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new MyConsumer(channel));

    }
}

class MyConsumer extends DefaultConsumer {
    private Channel channel;

    // 重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        /*super.handleDelivery(consumerTag,envelope,properties,body);*/
        String messageBody = new String(body);
        System.out.println("消费者接收到:" + messageBody);
        //签收消息,确认消息
        //envelope.getDeliveryTag() 获取这个消息的TagId
        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

目前队列(QUEUE_HELLOWORLD常量的值)中还没有消息
在这里插入图片描述
接着再创建一个生产者进程:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        //TCP 物理连接
        Connection conn = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();
        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
        //四个参数
        //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
        //队列名称
        //额外的设置属性
        //最后一个参数是要传递的消息字节数组
        String message = "HelloWorld";
        channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes());
        channel.close();
        conn.close();
        System.out.println("发送数据成功");
    }
}

在这里插入图片描述

接着查看消费者控制台的结果:
在这里插入图片描述

工作队列模式

  • 工作队列(Work queue),它会发送一些耗时的任务给你多个工作者(Worker)。
  • 在多个消息的情况下,Work Queue会将消息分派给不同的消费者,每个消费者都会收到不同的消息,并且可以根据处理消息的速度来接受消息的数量,进而让消费者程序发挥最大的性能。
  • Work Queue 提别适合在集群环境中做异步处理,能最大程序发挥每一台服务器的性能。
    在这里插入图片描述

下面以一个订票系统的简单例子来演示工作队列模式:
短信类:

public class Message {
    private String name;
    private String mobile;
    private String content;

    public Message(String name, String mobile, String content) {
        this.name = name;
        this.mobile = mobile;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

三个消费者:

消费者1

public class MessageSender1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

消费2

public class MessageSender2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

消费者3

public class MessageSender3 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String jsonSMS = new String(body);
                System.out.println("SMSSender3-短信发送成功:" + jsonSMS);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

生产者:一个订票系统

public class OrderSystem {

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
        for(int i = 100 ; i <= 200 ; i++) {
            Message message = new Message("乘客" + i, "13900000" + i, "您的车票已预订成功");
            String messageJsonStr = new Gson().toJson(message);
            channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , messageJsonStr.getBytes());
        }
        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }
}

生产者发送101条消息到RabbitConstant.QUEUE_SMS队列:
在这里插入图片描述
三个消费者一共消费这101条消息:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

发布(Publish)/订阅(Subscribe)模式

  • 发布/订阅模式中,生产者不再直接与队列绑定,而是将数据发送至 交换机Exchange
  • 交换机用于将数据按某种规则送入与之绑定的队列,进而供消费者使用。
  • 发布/订阅模式中,交换机将无差别的将所有消息送入与之绑定的队列,所有消费者拿到的消息完全相同,交换机的类型为fanout

    适用于数据提供商与应用商,例如,气象局提供天气数据送入交换机,各大平台(百度,新浪等)接入通过队列绑定到该交换机,自动获取气象局推送的气象数据。

下面就通过代码的方式来实现一个天气的例子

使用本机的rabbitmq(单机模式),在rabbitmq管理页面手动创建交换器和队列
在这里插入图片描述
队列baidu和sina会自动创建

大平台-百度(消费者)

public class Baidu {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

大平台-新浪(消费者)

public class Sina {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key(暂时用不到)
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

气象台(生产者)

public class WeatherBureau {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        String message = "上海明天天气,晴转多云!";
        Channel channel = connection.createChannel();
        channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, message.getBytes());
        channel.close();
        connection.close();
        System.out.println("天气发布成功");
    }
}

执行结果:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

路由Routinig模式

  • 路由模式是在发布订阅模式基础上的变种。
  • 发布订阅是无条件将所有消息分发给所有消费者队列。
  • 路由模式则是交换机根据Routing Key有条件的将数据筛选后发给消费者队列。
  • 路由模式下交换机的类型被称为direct

在这里插入图片描述

下面还是通过一个例子来测试下:

手动创建一个带路由件的交换器:
在这里插入图片描述

大平台-百度(消费者)

public class Baidu {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.xuzhou.20210311");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.shuqian.20210311");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.lianyungang.20210311");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.huaian.20210311");
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.yancheng.20210311");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

大平台-新浪(消费者)

public class Sina {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);

        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.nanjing.20210311");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.yangzhou.20210311");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.taizhou.20210311");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.nantong.20210311");
        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.jiangsu.zhenjiang.20210311");

        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}

气象台(生产者)

public class WeatherBureau {

    public static void main(String[] args) throws IOException, TimeoutException {
        Map<String, String> area = new LinkedHashMap<>();
        // 苏北五虎
        area.put("china.jiangsu.xuzhou.20210311", "中国江苏<<徐州>>2021年3月11日天气数据");
        area.put("china.jiangsu.shuqian.20210311", "中国江苏<<宿迁>>2021年3月11日天气数据");
        area.put("china.jiangsu.lianyungang.20210311", "中国江苏<<连云港>>2021年3月11日天气数据");
        area.put("china.jiangsu.huaian.20210311", "中国江苏<<淮安>>2021年3月11日天气数据");
        area.put("china.jiangsu.yancheng.20210311", "中国江苏<<盐城>>2021年3月11日天气数据");

        // 苏中
        area.put("china.jiangsu.nanjing.20210311", "中国江苏<<南京>>2021年3月11日天气数据");
        area.put("china.jiangsu.yangzhou.20210311", "中国江苏<<扬州>>2021年3月11日天气数据");
        area.put("china.jiangsu.taizhou.20210311", "中国江苏<<泰州>>2021年3月11日天气数据");
        area.put("china.jiangsu.nantong.20210311", "中国江苏<<南通>>2021年3月11日天气数据");
        area.put("china.jiangsu.zhenjiang.20210311", "中国江苏<<镇江>>2021年3月11日天气数据");

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        for (Map.Entry<String, String> me : area.entrySet()) {
            //Routing key 第二个参数相当于数据筛选的条件
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, me.getKey(), null, me.getValue().getBytes());
        }

        channel.close();
        connection.close();
    }
}

运行代码,先启动消费者,再启动生产者,可以清楚看到消费者根据路由件去匹配获取的数据,百度消费的是苏北的天气情况,新浪消费的是苏中的天气情况。
在这里插入图片描述
在这里插入图片描述

主题Topic模式

  • 主题模式是在Routing模式基础上,提供了对Route Key模式匹配的功能,可以简化程序的编写。
  • 主题模式下,模糊匹配表达式规则为
    * 匹配单个关键字
    # 匹配所有关键字
  • 主题模式下交换机的类型为topic
    在这里插入图片描述

手动新建一个主题模式的交换器:
在这里插入图片描述
生产者:

public class WeatherBureau {

    public static void main(String[] args) throws IOException, TimeoutException {
        Map<String, String> area = new LinkedHashMap<>();

        // 苏北
        area.put("china.jiangsu.xuzhou.20210311", "中国江苏<<徐州>>2021年3月11日天气数据");
        area.put("china.jiangsu.shuqian.20210311", "中国江苏<<宿迁>>2021年3月11日天气数据");
        area.put("china.jiangsu.lianyungang.20210311", "中国江苏<<连云港>>2021年3月11日天气数据");
        area.put("china.jiangsu.huaian.20210311", "中国江苏<<淮安>>2021年3月11日天气数据");
        area.put("china.jiangsu.yancheng.20210311", "中国江苏<<盐城>>2021年3月11日天气数据");

        // 上海·
        area.put("china.shanghai.songjiangqu.20210311", "中国上海<<松江区>>2021年3月11日天气数据");
        area.put("china.shanghai.jinganqu.20210312", "中国上海<<静安区>>2021年3月12日天气数据");
        area.put("china.shanghai.xuhuiqu.20210312", "中国上海<<徐汇区>>2021年3月12日天气数据");

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
        while (itr.hasNext()) {
            Map.Entry<String, String> me = itr.next();
            //Routing key 第二个参数相当于数据筛选的条件
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes());
        }

        channel.close();
        connection.close();
        System.out.println("天气数据发布成功");
    }
}

消费者:

public class Baidu {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.jiangsu.#");

        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}
public class Sina {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);

        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20210312");

        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到气象信息:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

结果:
在这里插入图片描述
在这里插入图片描述

RabbitMQ消息确认机制

  • RabbitMQ在传递消息的过程中充当了代理人(Broker)的角色,那生产者(Producer)怎样知道消息被正确投递到Broker了呢?
  • RabbitMQ提供了监听器(Listener)来接收消息投递的状态。
  • 消息确认涉及两种状态:Confirm和Return。

Confirm和Return

  • Confirm 代表生产者将消息送到Broker时产生的状态,后续会出现两种情况:
    ack 代表Broker已经将数据接收
    nack 代表Broker拒收消息。原因可能是:队列已满,限流,IO异常等等
  • Return 代表消息被Broker正常接收(ack)后,但Broker没有对应的队列进行投递时产生的状态,消息被退给生产者。
  • 注意:上面两种状态只代表生产者与Broker之间消息投递的情况。与消费者是否接收/确认消息无关。

消费者:

public class Baidu {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);
        //queueBind用于将队列与交换机绑定
        //参数1:队列名 参数2:交互机名  参数三:路由key
        channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.jiangsu.#");
        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_BAIDU, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("百度收到气象信息:" + new String(body) + ", DeliveryTag: " + envelope.getDeliveryTag());
                if ("china.jiangsu.lianyungang.20210311".equals(envelope.getRoutingKey())) {
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }
}

在百度队列中有五条消息,当消费到第三条时,我做了一个不确认操作,把此消息再重新放回队列,所以就看到下面这种结果(只成功消费了两条):
在这里插入图片描述
这第三条一直在返回队列,取出来消费做无限循环操作,导致后面两条消息也没机会消费。

如果把上面的代码:channel.basicNack(envelope.getDeliveryTag(), false, true);
改成channel.basicNack(envelope.getDeliveryTag(), false, false); 第三个参数改成了false,那么此消息就相当于被丢弃了,不会重新放入队列。

太平台-新浪(消费者)

public class Sina {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);

        channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20210312");

        channel.basicQos(1);
        channel.basicConsume(RabbitConstant.QUEUE_SINA, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("新浪收到气象信息:" + new String(body) + ", DeliveryTag: " + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

在这里插入图片描述
生产者

public class WeatherBureau {

    public static void main(String[] args) throws IOException, TimeoutException {
        Map<String, String> area = new LinkedHashMap<>();

        // 苏北
        area.put("china.jiangsu.xuzhou.20210311", "中国江苏<<徐州>>2021年3月11日天气数据");
        area.put("china.jiangsu.shuqian.20210311", "中国江苏<<宿迁>>2021年3月11日天气数据");
        area.put("china.jiangsu.lianyungang.20210311", "中国江苏<<连云港>>2021年3月11日天气数据");
        area.put("china.jiangsu.huaian.20210311", "中国江苏<<淮安>>2021年3月11日天气数据");
        area.put("china.jiangsu.yancheng.20210311", "中国江苏<<盐城>>2021年3月11日天气数据");

        // 上海·
        area.put("china.shanghai.songjiangqu.20210311", "中国上海<<松江区>>2021年3月11日天气数据");
        area.put("china.shanghai.jinganqu.20210312", "中国上海<<静安区>>2021年3月12日天气数据");
        area.put("china.shanghai.xuhuiqu.20210312", "中国上海<<徐汇区>>2021年3月12日天气数据");

        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        //开启confirm监听模式
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                //第二个参数代表接收的数据是否为批量接收,一般我们用不到。
                System.out.println("消息已被Broker 接收, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息已被Broker 拒收, deliveryTag: " + deliveryTag + ", multiple: " + multiple);
            }
        });
        channel.addReturnListener(new ReturnCallback() {
            @Override
            public void handle(Return r) {
                System.err.println("===========================");
                System.err.println("Return编码:" + r.getReplyCode() + ", Return描述:" + r.getReplyText());
                System.err.println("交换器:" + r.getExchange() + ", 路由key:" + r.getRoutingKey());
                System.err.println("Return主题:" + new String(r.getBody()));
                System.err.println("===========================");
            }
        });
        for (Map.Entry<String, String> me : area.entrySet()) {
            //Routing key 第二个参数相当于数据筛选的条件
            //第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。
            channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, me.getKey(), true, null, me.getValue().getBytes());
        }

        /*channel.close();
        connection.close();*/
    }
}

在生产者中开启了Confirm和Return 的监听,看下生产者的执行结果
在这里插入图片描述
可以看到上海松江区那条消息找不到队列给退回生产者了。

RabbitMQ集群架构模式

四种架构模式:

  • 主备模式(Warren)
  • 镜像模式(Mirror)
  • 远程模式(Shovel)
  • 多活模式(Federation)

主备模式(Warren)

实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模型非常好用且简单。

镜像模式(Mirror)

集群模式非常经典的就是镜像模式,保证100%数据不丢失,在实际工作中是使用最多的。并且实现集群非常的简单,一般大公司都会使用这种镜像模式。

远程模式(Shovel)

远程模式可以实现双活的一种模式,简称Shovel模式,所谓Shovel就是我们可以把消息进行不同数据中心的复制工作,可以跨地域的让两个mq集群互联。

多活模式(Federation)

多活模式是实现异地数据复制的主流模式,因为Shovel模式配置比较复杂,所以一般来说实现异地集群都是使用这种双活或多活模型来实现的。这种模型需要依赖rabbitmq的federation插件,可以实现持续的可靠的AMQP数据通信,多活模式在实际配置与应用非常的简单。