服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - spring-cloud-stream结合kafka使用详解

spring-cloud-stream结合kafka使用详解

2020-08-19 23:45KyleYaoKeepGoing Java教程

这篇文章主要介绍了spring-cloud-stream结合kafka使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1.pom文件导入依赖

?
1
2
3
4
5
<!-- kafka -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2.application.yml文件配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
 cloud:
  stream:
   kafka:
    binder:
     brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
   bindings:
    xxx_output: // 通道名称
     destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的
     // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
    xxx_input:
     destination: xxx // 消息发往的目的地,对应topic
     group: xxx // 对应kafka的group

3.创建消息发送者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {
 
  @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
  private MessageChannel oesWorkbenchChannel;
 
  /**
   * 发送一条kafka消息
   */
  public boolean sendLifeData(Object object) {
    return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
  }
}
 
// 发布通道
public interface Source {
  @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
  MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel
}

4.创建消息监听者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {
 
  @Resource
  private FileService fileService;
 
  @StreamListener(KafkaConstants.xxx_input) // 监听接受通道
  public void receiveData(MoveMessage moveMessage) {
  }
}
 
// 接受通道
public interface Sink {
  @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
  SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel
}

接下来就可以愉快的发送监听消息了

到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/oWanShiKaiTouNan/article/details/108056417

延伸 · 阅读

精彩推荐