RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)
七种模式介绍与应用场景
简单模式(Hello World)
工作队列模式(Work queues)
订阅模式(Publish/Subscribe)
一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列 一个缓存消息队列对应着多个缓存消费者 一个数据库消息队列对应着多个数据库消费者
路由模式(Routing)
主题模式(Topics)
#
匹配一个词或多个词,*
只匹配一个词。远程过程调用(RPC)
发布者确认(Publisher Confirms)
简单模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
private final static String QUEUE_NAME = 'simple_queue';
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
// queue:队列名
// durable:是否持久化
// exclusive:是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景
// autoDelete:是否自动删除 消费完删除
// arguments:其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息内容
String message = 'simplest mode message';
channel.basicPublish('', QUEUE_NAME, null, message.getBytes());
System.out.println('[x]Sent '' + message + ''');
//最后关闭通关和连接
channel.close();
connection.close();
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Receiver {
private final static String QUEUE_NAME = 'simplest_queue';
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 获取连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), 'UTF-8'); System.out.println(' [x] Received '' + delivery.getEnvelope().getRoutingKey() + '':'' + message + '''); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
private final static String QUEUE_NAME = 'queue_work';
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), 'UTF-8');
System.out.println(' [x] Received '' +
delivery.getEnvelope().getRoutingKey() + '':'' + message + ''');
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Receiver2 {
private final static String QUEUE_NAME = 'queue_work';
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一时刻服务器只会发送一条消息给消费者 channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), 'UTF-8'); System.out.println(' [x] Received '' + delivery.getEnvelope().getRoutingKey() + '':'' + message + '''); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
private final static String QUEUE_NAME = 'queue_work';
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
String message = 'work mode message' + i;
channel.basicPublish('', QUEUE_NAME, null, message.getBytes());
System.out.println('[x] Sent '' + message + ''');
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
发布订阅模式
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;
public class Receive1 {
private static final String EXCHANGE_NAME = 'logs';
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, 'fanout'); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, '');
System.out.println(' [*] Waiting for messages. To exit press CTRL+C');
// 订阅消息的回调函数 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), 'UTF-8'); System.out.println(' [x] Received '' + message + '''); };
// 消费者,有消息时出发订阅回调函数 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Receive2 {
private static final String EXCHANGE_NAME = 'logs';
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, 'fanout');
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, '');
System.out.println(' [*] Waiting for messages. To exit press CTRL+C');
// 订阅消息的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), 'UTF-8');
System.out.println(' [x] Received2 '' + message + ''');
};
// 消费者,有消息时出发订阅回调函数
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
public class Sender {
private static final String EXCHANGE_NAME = 'logs';
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, 'fanout');
String message = 'publish subscribe message'; channel.basicPublish(EXCHANGE_NAME, '', null, message.getBytes('UTF-8')); System.out.println(' [x] Sent '' + message + ''');
channel.close(); connection.close(); }}
路由模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver1 {
private final static String QUEUE_NAME = 'queue_routing';
private final static String EXCHANGE_NAME = 'exchange_direct';
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 指定路由的key,接收key和key2
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, 'key');
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, 'key2');
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), 'UTF-8');
System.out.println(' [x] Received '' +
delivery.getEnvelope().getRoutingKey() + '':'' + message + ''');
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Receiver2 {
private final static String QUEUE_NAME = 'queue_routing2'; private final static String EXCHANGE_NAME = 'exchange_direct';
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 仅接收key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, 'key2');
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), 'UTF-8'); System.out.println(' [x] Received '' + delivery.getEnvelope().getRoutingKey() + '':'' + message + '''); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Sender {
private final static String EXCHANGE_NAME = 'exchange_direct';
private final static String EXCHANGE_TYPE = 'direct';
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 交换机声明
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
// 只有routingKey相同的才会消费
String message = 'routing mode message';
channel.basicPublish(EXCHANGE_NAME, 'key2', null, message.getBytes());
System.out.println('[x] Sent '' + message + ''');
// channel.basicPublish(EXCHANGE_NAME, 'key', null, message.getBytes());
// System.out.println('[x] Sent '' + message + ''');
channel.close();
connection.close();
}
}
主题模式
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Receiver1 {
private final static String QUEUE_NAME = 'queue_topic'; private final static String EXCHANGE_NAME = 'exchange_topic';
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 可以接收key.1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, 'key.*');
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), 'UTF-8'); System.out.println(' [x] Received '' + delivery.getEnvelope().getRoutingKey() + '':'' + message + '''); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver2 {
private final static String QUEUE_NAME = 'queue_topic2';
private final static String EXCHANGE_NAME = 'exchange_topic';
private final static String EXCHANGE_TYPE = 'topic';
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost('localhost');
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// *号代表单个单词,可以接收key.1
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, '*.*');
// #号代表多个单词,可以接收key.1.2
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, '*.#');
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), 'UTF-8');
System.out.println(' [x] Received '' +
delivery.getEnvelope().getRoutingKey() + '':'' + message + ''');
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;import java.util.concurrent.TimeoutException;
public class Sender {
private final static String EXCHANGE_NAME = 'exchange_topic'; private final static String EXCHANGE_TYPE = 'topic';
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost('localhost'); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
String message = 'topics model message with key.1'; channel.basicPublish(EXCHANGE_NAME, 'key.1', null, message.getBytes()); System.out.println('[x] Sent '' + message + '''); String message2 = 'topics model message with key.1.2'; channel.basicPublish(EXCHANGE_NAME, 'key.1.2', null, message2.getBytes()); System.out.println('[x] Sent '' + message2 + ''');
channel.close(); connection.close(); }}
四种交换机介绍
直连交换机(Direct exchange): 具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列
扇形交换机(Fanout exchange): 广播消息到所有队列,没有任何处理,速度最快
主题交换机(Topic exchange): 在直连交换机基础上增加模式匹配,也就是对routing_key进行模式匹配,
*
代表一个单词,#
代表多个单词首部交换机(Headers exchange): 忽略routing_key,使用Headers信息(一个Hash的数据结构)进行匹配,优势在于可以有更多更灵活的匹配规则
总结
RabbitMQ官方教程 官方教程源码