先来个简单的DEMO
1、创建一个MAVEN项目,引入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.29</version>
</dependency>
</dependencies>
2、写一个消费者类send
package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 描述;helloworld发送类,连接到rabbitmq的服务端,发送消息并退出
*/
public class Send {
private final static String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置Rabbitmq地址
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("******");
//建立连接
Connection connection=factory.newConnection();
//获得信道
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发布消息
String message="hello world!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("发送了消息"+message);
//关闭连接
channel.close();
connection.close();
}
}
3、写一个接收者recv
package helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 描述:接收消息并打印 持续运行
*/
public class Recv {
private final static String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置Rabbitmq地址
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("******");
//建立连接
Connection connection=factory.newConnection();
//获得信道
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//接收消息并消费
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String message=new String(body,"UTF-8");
System.out.println("收到消息:"+message);
}
});
// //关闭连接
// channel.close();
}
}
到此,就基本完成了一个rabbitMQ基础的demo。消费者发送消息,接收者持续执行,监听队列中的消息并处理。
再来一个多消费者模式
1、创建一个生产者newtask
package workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class NewTask {
private final static String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置Rabbitmq地址
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("******");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message;
if(i%2==0){
message=i+"...";
}else{
message=String.valueOf(i);
}
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("发送了消息"+message);
}
channel.close();
connection.close();
}
}
2、可以多次启动的worker类
package workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 描述:消费者接收消息
*/
public class Worker {
private final static String TASK_QUEUE_NAME="task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置Rabbitmq地址
factory.setHost("***");
factory.setUsername("******");
factory.setPassword("******");
//建立连接
Connection connection=factory.newConnection();
//获得信道
Channel channel=connection.createChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
//接收消息并消费
System.out.println("开始接收消息");
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String message=new String(body,"UTF-8");
System.out.println("收到了消息:"+message);
try{
doWork(message);
}finally {
System.out.println("完成了消息处理");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
});
}
private static void doWork(String task){
char[] chars= task.toCharArray();
for(char ch:chars){
if(ch=='.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
至此,就可以平均分配给多个worker去处理。
以下是使用交换机的场景,首先是fanout交换机(扇形交换机)
fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,所以此时routing key是不起作用的。 1、一个发布日志消息的生产者EmitLog
package fanout;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmitLog {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("******");
factory.setUsername("***");
factory.setPassword("******");
Connection connection = factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message="info: Hello World";
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(
"UTF-8"));
System.out.println("发送了消息"+message);
channel.close();
connection.close();
}
}
2、一个接收日志消息并处理的消费者
package fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("******");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName=channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("开始接收消息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了消息" + message);
}
};
channel.basicConsume(queueName,true,consumer);
}
}
然后是direct交换机(直连交换机) emitLogDirect
package direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* direct类型的交换机
*/
public class EmitLogDirect {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("******");
Connection connection = factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message="Hello World";
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(
"UTF-8"));
System.out.println("发送了消息"+",等级为info,消息内容为:"+message);
channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(
"UTF-8"));
System.out.println("发送了消息"+",等级为warning,消息内容为:"+message);
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes(
"UTF-8"));
System.out.println("发送了消息"+",等级为error,消息内容为:"+message);
channel.close();
connection.close();
}
}
一个接收所有消息的消费者
package direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 接收三种类型等级的日志
*/
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("***");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一个临时的queue
String queueName=channel.queueDeclare().getQueue();
//绑定三个
channel.queueBind(queueName,EXCHANGE_NAME,"info");
channel.queueBind(queueName,EXCHANGE_NAME,"warning");
channel.queueBind(queueName,EXCHANGE_NAME,"error");
System.out.println("开始接收消息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了消息" + message);
}
};
channel.basicConsume(queueName,true,consumer);
}
}
一个接收错误消息的消费者
package direct;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 接收三种类型等级的日志
*/
public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("******");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//生成一个临时的queue
String queueName=channel.queueDeclare().getQueue();
//绑定一个
channel.queueBind(queueName,EXCHANGE_NAME,"error");
System.out.println("开始接收消息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了消息" + message);
}
};
channel.basicConsume(queueName,true,consumer);
}
}
这样就可以将所有的日志都打印在控制台且错误日志放到文件或进行其他处理。
然后是topic交换机(主题交换机) emitLogTopic
package topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 以topic模式交换机发送消息
*/
public class EmitLogTopic {
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("*****");
Connection connection = factory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message="animal world";
String[] routingKeys=new String[9];
routingKeys[0]="quick.orange.rabbit";
routingKeys[1]="lazy.orange.elephant";
routingKeys[2]="quick.orange.fox";
routingKeys[3]="lazy.brown.fox";
routingKeys[4]="lazy.pink.rabbit";
routingKeys[5]="quick.brown.fox";
routingKeys[6]="orange";
routingKeys[7]="quick.orange.male.rabbit";
routingKeys[8]="lazy.orange.male.rabbit";
for (int i=0;i<routingKeys.length;i++){
channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8"));
System.out.println("发送了:"+message+" routingKey:"+routingKeys[i]);
}
channel.close();
connection.close();
}
}
然后是其中一个消费者,接收橘黄色的所有信息
package topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 特定路由键
*/
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("***");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//生成一个临时的queue
String queueName=channel.queueDeclare().getQueue();
String routingKey="*.orange.*";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("开始接收消息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了消息" + message+"routingKey:"+envelope.getRoutingKey());
}
};
channel.basicConsume(queueName,true,consumer);
}
}
另一个消费者,接收所有包含兔子和lazy的所有消息
package topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 特定路由键
*/
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("***");
factory.setUsername("***");
factory.setPassword("***");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//生成一个临时的queue
String queueName=channel.queueDeclare().getQueue();
String routingKey="*.*.rabbit";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
String routingKey2="lazy.#";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey2);
System.out.println("开始接收消息");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到了消息" + message+"routingKey:"+envelope.getRoutingKey());
}
};
channel.basicConsume(queueName,true,consumer);
}
}
可以说,topic交换机模式就很自由且灵活了。这种模式可以实现模糊匹配规则,*代表一个匹配,#代表0个1个或多个匹配
评论