专业网站建设品牌,十四年专业建站经验,服务6000+客户--广州京杭网络
免费热线:400-683-0016      微信咨询  |  联系我们

rabbitmq官方的六种工作模式

当前位置:网站建设 > 技术支持
资料来源:网络整理       时间:2023/2/14 0:51:50       共计:3620 浏览
1.RabbitMq
1.1介绍

RabbitMQ是一个消息代理:它接受并转发消息。你可以把它当成一个邮局:当你想邮寄信件的时候,你会把信件放在投递箱中,并确信邮递员最终会将信件送到收件人的手里。在这个例子中,RabbitMQ就相当与投递箱、邮局和邮递员。

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。
 

 
1.2六种工作模式

官网介绍:https://www.rabbitmq.com/getstarted.html

这里简单介绍下六种工作模式的主要特点:

简单模式:一个生产者,一个消费者

work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。

订阅模式:一个生产者发送的消息会被多个消费者获取。

路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key

topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
1.2.0

准备:

安装rabbitmq,并启动

引入pom文件:

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
     
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>3.4.1</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.3.2</version>
            </dependency>
     
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>1.4.0.RELEASE</version>
            </dependency>
        </dependencies>

编写连接工具类:

    package com.sakura.util;
     
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class ConnectionUtil {
        public static Connection getConnection() throws IOException {
            //连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            //连接5672端口  注意15672为工具界面端口  25672为集群端口
            factory.setPort(5672);
            factory.setVirtualHost("/jxd");
            factory.setUsername("jxd");
            factory.setPassword("123");
            //获取连接
            Connection connection = factory.newConnection();
     
            return connection;
     
        }
    }

这里说一下配置虚拟主机和用户名密码

为什么会有虚拟主机:

当我们在创建用户时,会指定用户能访问一个虚拟机,并且该用户只能访问该虚拟机下的队列和交换机,如果没有指定,默认的是”/”;一个rabbitmq服务器上可以运行多个vhost,以便于适用不同的业务需要,这样做既可以满足权限配置的要求,也可以避免不同业务之间队列、交换机的命名冲突问题,因为不同vhost之间是隔离的。

设置权限:

1.2.1简单模式

生产者:

    package com.sakura.simple;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Sender {
        private final static String QUEUE_NAME = "simple_queue";
     
        public static void main(String[] args) throws IOException {
            //创建连接
            Connection connection = ConnectionUtil.getConnection();
            //创建通道
            Channel channel = connection.createChannel();
            //声明队列
            /**
             * 队列名
             * 是否持久化
             *  是否排外  即只允许该channel访问该队列   一般等于true的话用于一个队列只能有一个消费者来消费的场景
             *  是否自动删除  消费完删除
             *  其他属性
             *
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            //消息内容
            /**
             * 交换机
             * 队列名
             * 其他属性  路由
             * 消息body
             */
            String message = "错的不是我,是这个世界~";
            channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
            System.out.println("[x]Sent '"+message + "'");
     
            //最后关闭通关和连接
            channel.close();
            connection.close();
     
     
        }
    }

 

消费者:

    package com.sakura.simple;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver {
        private final static String QUEUE_NAME = "simple_queue";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            //获取连接
            Connection connection = ConnectionUtil.getConnection();
            //获取通道
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
     
            while(true){
                //该方法会阻塞
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received '"+message+"'");
     
            }
        }
    }

1.2.1work模式

生产者:

    package com.sakura.work;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Sender {
        private final  static String QUEUE_NAME = "queue_work";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for(int i = 0; i < 100; i++){
                String message = "冬马小三" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("[x] Sent '"+message + "'");
                Thread.sleep(i*10);
            }
     
            channel.close();
            connection.close();
        }
    }

 

消费者:

    package com.sakura.work;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver1 {
        private final static  String QUEUE_NAME = "queue_work";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false,false, false,null);
            //同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //关于手工确认 待之后有时间研究下
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received1 '"+message+"'");
                Thread.sleep(10);
                //返回确认状态
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
     
        }
    }

    package com.sakura.work;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver2 {
        private final static  String QUEUE_NAME = "queue_work";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false,false, false,null);
            //同一时刻服务器只会发送一条消息给消费者
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received2 '"+message+"'");
                Thread.sleep(1000);
                //返回确认状态
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
     
        }
    }

 

channel.basicQos启

The following basic (Java) example will receive a maximum of 10 unacknowledged messages at once:

依据官网看。是指通道channel每次能够接收的消费者最大值  https://www.rabbitmq.com/consumer-prefetch.html

若将该行代码注释,则channel无限制,消息将很快发送完毕,只不过消息阻塞在队列中
1.2.3订阅模式
1.2.4路由模式

生产者:

    package com.sakura.routing;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Sender {
        private final static String EXCHANGE_NAME = "exchange_direct";
        private final static String EXCHANGE_TYPE = "direct";
     
        public static void main(String[] args) throws IOException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
     
            String message = "那一定是蓝色";
            channel.basicPublish(EXCHANGE_NAME,"key2", null, message.getBytes());
            System.out.println("[x] Sent '"+message+"'");
     
            channel.close();
            connection.close();
        }
    }

 

 

消费者:

    package com.sakura.routing;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver1 {
        private final  static  String QUEUE_NAME = "queue_routing";
        private final static String EXCHANGE_NAME = "exchange_direct";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
     
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received1 "+message);
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
     
     
        }
    }

 

    package com.sakura.routing;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver2 {
        private final  static  String QUEUE_NAME = "queue_routing2";
        private final static String EXCHANGE_NAME = "exchange_direct";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
     
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received2 "+message);
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
     
     
        }
    }

路由模式下,一个队列可以绑定多个路由
1.2.5topic模式

生产者:

    package com.sakura.topic;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Sender {
        private final static String EXCHANGE_NAME = "exchange_topic";
        private final static String EXCHANGE_TYPE = "topic";
     
        public static void main(String[] args) throws IOException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
     
            //消息内容
            String message = "如果真爱有颜色";
            channel.basicPublish(EXCHANGE_NAME,"key.1",null,message.getBytes());
            System.out.println("[x] Sent '"+message+"'");
     
            //关通道 关连接
            channel.close();
            connection.close();
        }
    }

 

 

消费者:

    package com.sakura.topic;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver1 {
        private final static String QUEUE_NAME = "queue_topic";
        private final static String EXCHANGE_NAME = "exchange_topic";
        private final static String EXCHANGE_TYPE = "topic";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false,false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
     
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received1 '"+message + "'");
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

 

 

    package com.sakura.topic;
     
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sakura.util.ConnectionUtil;
     
    import java.io.IOException;
     
    /**
     * Created by apple on 2018/9/4.
     */
    public class Receiver2 {
        private final static String QUEUE_NAME = "queue_topic2";
        private final static String EXCHANGE_NAME = "exchange_topic";
        private final static String EXCHANGE_TYPE = "topic";
     
        public static void main(String[] args) throws IOException, InterruptedException {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
     
            channel.queueDeclare(QUEUE_NAME, false, false,false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
     
            channel.basicQos(1);
     
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, false, consumer);
     
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Received2 '"+message + "'");
                Thread.sleep(10);
     
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

 

 

 

 

项目地址:https://gitee.com/ecoSakuraSou/spring-rabbitmq
版权说明:
本网站凡注明“广州京杭 原创”的皆为本站原创文章,如需转载请注明出处!
本网转载皆注明出处,遵循行业规范,如发现作品内容版权或其它问题的,请与我们联系处理!
欢迎扫描右侧微信二维码与我们联系。
·上一条:RabbitMQ七种工作模式 | ·下一条:RabbitMq安装教程(超详细)

Copyright © 广州京杭网络科技有限公司 2005-2025 版权所有    粤ICP备16019765号 

广州京杭网络科技有限公司 版权所有