服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Spring Cloud Stream简单用法

Spring Cloud Stream简单用法

2021-10-21 10:24微瞰技术 Java教程

Spring cloud stream是为构建微服务消息驱动而产生的一种框架。Spring Cloud Stream基于Spring boot的基础上,可创建独立的、生产级别的Spring应用,并采用Spring Integration来连接消息中间件提供消息事件驱动,一起看看吧

Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,我们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream原生⽀持RabbitMQ和Kafka,阿里在这个基础上提供了RocketMQ的支持

简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者

生产者

pom文件中加入依赖

   org.springframework.cloudspring-cloud-starter-netflix-eureka-client com.alibaba.cloudspring-cloud-starter-stream-rocketmq2.1.0.RELEASE

配置文件中增加关于Spring Cloud Stream binder和bindings的配置

 spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON

其中destination代表生产的数据发送到的topic 然后定义一个channel用于数据发送

 import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface TestChannel { @Output("output") MessageChannel output(); }

最后构造数据发送的接口

 @Controller public class SendMessageController { @Resource private TestChannel testChannel; @ResponseBody @RequestMapping(value = "send", method = RequestMethod.GET) public String sendMessage() { String messageId = UUID.randomUUID().toString(); Message message = MessageBuilder .withPayload("this is a test:" + messageId) .setHeader(MessageConst.PROPERTY_TAGS, "test") .build(); try { testChannel.output().send(message); return messageId + "发送成功"; } catch (Exception e) { return messageId + "发送失败,原因:" + e.getMessage(); } } }

消费者

消费者的pom引入与生产者相同,在此不再赘述,配置时需要将stream的output修改为input并修改对应属性

 spring: application: name: zhao-cloud-stream-consumer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: consumer: tags: test bindings: input: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON group: test

另外关于channel的构造也要做同样的修改

 import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface TestChannel { @Input("input") SubscribableChannel input(); }

最后我在启动类中对收到的消息进行了监听

 @StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException { System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); }

测试结果

Spring Cloud Stream简单用法

Spring Cloud Stream简单用法

Stream其他特性

消息发送失败的处理

消息发送失败后悔发送到默认的一个“topic.errors"的channel中(topic是配置的destination)。要配置消息发送失败的处理,需要将错误消息的channel打开 消费者配置如下

 spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON producer: errorChannelEnabled: true

在启动类中配置错误消息的Channel信息

 @Bean("stream-test-topic.errors") MessageChannel testoutPutErrorChannel(){ return new PublishSubscribeChannel(); }

新建异常处理service

 import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; @Service public class ErrorProducerService { @ServiceActivator(inputChannel = "stream-test-topic.errors") public void receiveProducerError(Message message){ System.out.println("receive error msg :"+message); } }

当发生异常时,由于测试类中已经将异常捕获,处理发送异常主要是在这里进行。模拟,应用与rocketMq断开的场景。可见

Spring Cloud Stream简单用法Spring Cloud Stream简单用法

消费者错误处理

首先增加配置为

 spring: application: name: zhao-cloud-stream-producer cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: producer: group: test sync: true bindings: output: destination: stream-test-topic content-type: text/plain # 内容格式。这里使用 JSON producer: errorChannelEnabled: true

增加相应的模拟异常的操作

 @StreamListener("input") public void receiveInput(@Payload Message message) throws ValidationException { //System.out.println("input1 receive: " + message.getPayload() + ", foo header: " + message.getHeaders().get("foo")); throw new RuntimeException("oops"); } @ServiceActivator(inputChannel = "stream-test-topic.test.errors") public void receiveConsumeError(Message message){ System.out.println("receive error msg"+message.getPayload()); }

Spring Cloud Stream简单用法

代码地址https://github.com/zhendiao/deme-code/tree/main/zp

到此这篇关于Spring Cloud Stream简单用法的文章就介绍到这了,更多相关Spring Cloud Stream使用内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/u011342403/article/details/119030043

延伸 · 阅读

精彩推荐
  • Java教程Spring AOP基本概念

    Spring AOP基本概念

    这篇文章主要为大家详细介绍了spring基础概念AOP与动态代理,具有一定的参考价值,感兴趣的小伙伴们可以参考一下,希望能给你带来帮助...

    宁在春8222021-10-13
  • Java教程Java ThreadPoolExecutor 线程池的使用介绍

    Java ThreadPoolExecutor 线程池的使用介绍

    Executors 是一个Java中的工具类. 提供工厂方法来创建不同类型的线程池,这篇文章主要介绍了Java ThreadPoolExecutor 线程池的使用介绍,文中通过示例代码介绍的...

    sc_ik5392021-07-28
  • Java教程详解spring boot集成RabbitMQ

    详解spring boot集成RabbitMQ

    RabbitMQ作为AMQP的代表性产品,在项目中大量使用。结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题。...

    SamHxm1732020-09-03
  • Java教程kafka生产实践(详解)

    kafka生产实践(详解)

    下面小编就为大家带来一篇kafka生产实践(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    Java教程网2842020-12-09
  • Java教程详解Java的Struts框架中栈值和OGNL的使用

    详解Java的Struts框架中栈值和OGNL的使用

    这篇文章主要介绍了Java的Struts框架中栈值和OGNL的使用,Struts框架是Java的SSH三大web开发框架之一,需要的朋友可以参考下 ...

    goldensun3532020-03-03
  • Java教程详解Java引用类型的参数也是值传递

    详解Java引用类型的参数也是值传递

    这篇文章主要介绍了Java引用类型的参数也是值传递,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下...

    码途有约11642021-07-25
  • Java教程RxJava+Retrofit+Mvp实现购物车

    RxJava+Retrofit+Mvp实现购物车

    这篇文章主要为大家详细介绍了RxJava+Retrofit+Mvp实现购物车功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Cyq_09278602021-04-28
  • Java教程Mybatis高级映射、动态SQL及获得自增主键的解析

    Mybatis高级映射、动态SQL及获得自增主键的解析

    MyBatis 本是apache的一个开源项目iBatis, 2010年这个项目由apache software foundation 迁移到了google code,并且改名为MyBatis。这篇文章主要介绍了Mybatis高级映射、动态...

    java教程网5182020-06-30