13. RabbitMQ - 通配符模式 案例代码
大约 4 分钟
1. 引言
代码已上传至Github,有兴趣的同学可以下载看看:https://github.com/ylw-github/RabbitMQ-Demo
前面博客讲解了RabbitMQ的五种队列形式《消息中间件系列教程(06) -RabbitMQ -五种队列形式》,主要讲解一下五种队列的代码实现。
主要分为:
- 点对点队列模式(简单)
- 工作队列模式(公平性)
- 发布订阅模式
- 路由模式Routing
- 通配符模式Topics
本文主要讲解通配符模式。
2. 配符模式
功能:此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor 符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
2.1 邮件短信案例
1.新建Maven项目RabbitMQ-Demo
2.添加Maven依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
</dependencies>
3.连接工具类
package com.ylw.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnecUtils {
public static Connection newConnection() throws IOException, TimeoutException {
// 1.定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置服务器地址
factory.setHost("127.0.0.1");
// 3.设置协议端口号
factory.setPort(5672);
// 4.设置vhost
factory.setVirtualHost("OrderHost");
// 5.设置用户名称
factory.setUsername("OrderAdmin");
// 6.设置用户密码
factory.setPassword("123456");
// 7.创建新的连接
Connection newConnection = factory.newConnection();
return newConnection;
}
}
2.1.1 生产者
public class Producer {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = RabbitMQConnecUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.绑定的交换机 参数1交互机名称 参数2 exchange类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "log.info.error";
String msg = "topic_exchange_msg" + routingKey;
// 4.发送消息
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("生产者发送msg:" + msg);
// // 5.关闭通道、连接
channel.close();
connection.close();
// 注意:如果消费没有绑定交换机和队列,则消息会丢失
}
}
2.1.2 消费者
邮件消费者:
public class ConsumerEmailDirect {
private static final String QUEUE_NAME = "consumer_topic_email";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = RabbitMQConnecUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
短信消费者:
public class ConsumerSMSDirect {
private static final String QUEUE_NAME = "consumer_topic_sms";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建新的连接
Connection connection = RabbitMQConnecUtils.newConnection();
// 2.创建通道
Channel channel = connection.createChannel();
// 3.消费者关联队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取生产者消息:" + msg);
}
};
// 5.消费者监听队列消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3. 测试
启动生产者,并关闭,让其在RabbitMQ里面注册交换机,在控制台可以看出注册成功(如果不启动,可以手动注册,如下图Add a new exchange):
启动邮件消费者和短信消费者,在控制台可以看出有两个队列:*
再启动生产者,可以看到邮件消费者消费信息
而短信消费者没有消费信息:
因为生产者的RouteKey为log.info.error
,邮件消费者的匹配的RouteKey为log.#
(匹配log后面的所有),SMS消费者的匹配的RouteKey为log.*
(匹配log后面的一个)。所以只有邮件消费者能消费消息。