Fork me on GitHub

SpringCloud异步消息

1. Docker Quickstart Terminal 该快捷方式所指向的项目“bash.exe”已经更改或移动

出现这个问题是由于之前装过Git,安装Docker默认生成的快捷方式中,Git的路径是C:\Program Files\Git\bin\bash.exe。而之前安装的Git并不是这个路径,所以就会报找不到应用程序的问题。此时只要把Git的路径改为自己之前安装的路径即可,比如我的就在D:\Program Files\Git\bin\bash.exe,修改后的Target为如图所示(后面的路径不用修改):

1

在docker下安装rabbitmq:https://hub.docker.com/_/rabbitmq/

2

–hostname必填, -p 15672:15672是管理员登录端口(将端口映射为本机端口),默认的用户名和密码均为guest。

然后登录你的docker的地址+ 15672就可以进入到rabbitmq的管理员界面了。(192.168.99.100:15672)

3

2.使用spring cloud stream

在springcloud中可以使用rabbitmq实现异步消息对列。springcloud中提供了一个spring cloud stream的组件实现异步消息对列的操作。但是spring cloud stream只支持rabbitmq和kafka.

spring cloud stream是对中间件的进一步封装,可以更加方便灵活的使用中间件,甚至可以做到对中间件代码层的无感知,和中间件的切换。

1

首先还是引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

定义一个stream的发送接口:

1
2
3
4
5
6
7
8
9
10
11
12
public interface StreamClient {

String INPUT = "myMessage";

String INPUT2 = "myMessage2";

@Input(StreamClient.INPUT)
SubscribableChannel input();

@Output(StreamClient.INPUT2)
MessageChannel output();
}

定义一个stream的接收类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@EnableBinding(StreamClient.class) //定义的发送的接口类
@Slf4j
public class StreamReceiver {

/**
* 接收orderDTO对象 消息
* @param message
*/
@StreamListener(value = StreamClient.INPUT)
@SendTo(StreamClient.INPUT2) //这个注解其实是消息处理完之后再返回给消息对列一个消息
public String process(OrderDTO message) {
log.info("StreamReceiver: {}", message);
return "received.";
}

@StreamListener(value = StreamClient.INPUT2)
public void process2(String message) {
log.info("StreamReceiver2: {}", message);
}
}

测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@RestController
public class SendMessageController {

@Autowired
private StreamClient streamClient;

//发送消息
@GetMapping("/sendMessage")
public void process() {
String message = "now " + new Date();
streamClient.output().send(MessageBuilder.withPayload(message).build());
}

/**
* 发送 orderDTO对象
*/
@GetMapping("/sendMessage")
public void process() {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("123456");
streamClient.output().send(MessageBuilder.withPayload(orderDTO).build());
}
}

这样就实现了消息的发送和接收,但是还有一个问题就是当你的启动多个实例的时候,每个实例中的对列都会收到消息,其实只需要需要的那个对列获取到消息即可。要实现这样的话其实就是需要在配置文件中配置一下分组的信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
application:
name: order
cloud:
config:
discovery:
enabled: true
service-id: CONFIG
profile: test
stream:
bindings:
myMessage:
group: order
content-type: application/json

其实就是给bindings分组命名就可以了。就是把所有启动的实例分到一个组里面,这样的话一个组里的所有实例只会有一个接收到消息,其余的不会接收到消息。content-type: application/json的注解其实就是使得rabbitmq中拿到的消息是json。

消息处理完成之后返回给对列一个消息的时候怎么处理呢?

StreamReceiver端添加注解:@SendTo(StreamClient.INPUT2) //这个注解其实是消息处理完之后再返回给消息对列一个消息

本文标题:SpringCloud异步消息

文章作者:WilsonSong

发布时间:2019年01月19日 - 21:01

最后更新:2019年03月09日 - 21:03

原始链接:https://songwell1024.github.io/2019/01/19/SpringCloudQueue/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

-------------本文结束感谢您的阅读-------------