rabbitmq学习

Springboot集成Rabbitmq

1.基于Spring-Rabbit操作Rabbitmq

使用spring-boot-starter-amqp会自动添加spring-rabbit依赖
代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

2.配置

1.配置application.yml文件

连接rabbitmq

1
2
3
4
5
6
7
8
9
10
11
server:
port: 44000
spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /

2.定义RabbitConfig类,配置Exchange、Queue、及绑定交换机。

以配置topics交换机为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
private static final String ROUTINGKEY_SMS="inform.#.sms.#";

//声明交换机
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息队列重启后交换机仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).build();
}

//声明队列
@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
return new Queue(QUEUE_INFORM_EMAIL);
}
@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
return new Queue(QUEUE_INFORM_SMS);
}

//绑定交换机和队列
@Bean
public Binding BINGDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}

@Bean
public Binding BINGDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}

3.SpringBootTest单元测试

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer04_topics_springboot {

@Autowired
RabbitTemplate rabbitTemplate;

//使用RabbitTemplate来发送消息
@Test
public void testSendEmail() {
String message="send email to user";
/**
* 参数:
* 1.交换机名称
* 2.routingkey
* 3.消息内容
*/
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.email",message);
System.out.println("Send Message is:'" +message+"'");

}
}

报错

因为之前已经开启并且持久化了一个相同名字的交换机“exchange_topics_inform”,springboot启动就报了io异常

1
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(Ra

解决办法是登录到rabbitmq中把该交换机删掉就行了。