服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Springboot Websocket Stomp 消息订阅推送

Springboot Websocket Stomp 消息订阅推送

2021-09-30 11:01代码大师麦克劳瑞 Java教程

本文主要介绍了Springboot Websocket Stomp 消息订阅推送,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

需求背景

闲话不扯,直奔主题。需要和web前端建立长链接,互相实时通讯,因此想到了websocket,后面随着需求的变更,需要用户订阅主题,实现消息的精准推送,发布订阅等,则想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的简单文本协议。

websocket协议

想到了之前写的一个websocket长链接的demo,也贴上代码供大家参考。

pom文件
直接引入spring-boot-starter-websocket即可。

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-websocket</artifactId>
  4. </dependency>

声明websocket endpoint

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.socket.server.standard.ServerEndpointExporter;
  4.  
  5. /**
  6. * @ClassName WebSocketConfig
  7. * @Author scott
  8. * @Date 2021/6/16
  9. * @Version V1.0
  10. **/
  11. @Configuration
  12. public class WebSocketConfig {
  13.  
  14. /**
  15. * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
  16. */
  17. @Bean
  18. public ServerEndpointExporter serverEndpointExporter() {
  19. return new ServerEndpointExporter();
  20. }
  21.  
  22. }

websocket实现类,其中通过注解监听了各种事件,实现了推送消息等相关逻辑

  1. import com.google.common.cache.Cache;
  2. import com.google.common.cache.CacheBuilder;
  3. import com.ruoyi.common.core.domain.AjaxResult;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.stereotype.Component;
  7.  
  8. import javax.websocket.*;
  9. import javax.websocket.server.PathParam;
  10. import javax.websocket.server.ServerEndpoint;
  11. import java.util.Objects;
  12. import java.util.Set;
  13. import java.util.concurrent.ConcurrentMap;
  14. import java.util.concurrent.TimeUnit;
  15. import java.util.concurrent.atomic.AtomicInteger;
  16.  
  17. /**
  18. * @ClassName: DataTypePushWebSocket
  19. * @Author: scott
  20. * @Date: 2021/6/16
  21. **/
  22. @ServerEndpoint(value = "/ws/dataType/push/{token}")
  23. @Component
  24. public class DataTypePushWebSocket {
  25.  
  26. private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);
  27.  
  28. /**
  29. * 记录当前在线连接数
  30. */
  31. private static AtomicInteger onlineCount = new AtomicInteger(0);
  32.  
  33. private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
  34. .initialCapacity(10)
  35. .maximumSize(300)
  36. .expireAfterWrite(10, TimeUnit.MINUTES)
  37. .build();
  38.  
  39. /**
  40. * 连接建立成功调用的方法
  41. */
  42. @OnOpen
  43. public void onOpen(Session session, @PathParam("token")String token) {
  44. String sessionId = session.getId();
  45. onlineCount.incrementAndGet(); // 在线数加1
  46. this.sendMessage("sessionId:" + sessionId +",已经和server建立连接", session);
  47. SESSION_CACHE.put(sessionId,session);
  48. log.info("有新连接加入:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
  49. }
  50.  
  51. /**
  52. * 连接关闭调用的方法
  53. */
  54. @OnClose
  55. public void onClose(Session session,@PathParam("token")String token) {
  56. onlineCount.decrementAndGet(); // 在线数减1
  57. SESSION_CACHE.invalidate(session.getId());
  58. log.info("有一连接关闭:{},当前在线连接数为:{}", session.getId(), onlineCount.get());
  59. }
  60.  
  61. /**
  62. * 收到客户端消息后调用的方法
  63. *
  64. * @param message 客户端发送过来的消息
  65. */
  66. @OnMessage
  67. public void onMessage(String message, Session session,@PathParam("token")String token) {
  68. log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
  69. this.sendMessage("服务端已收到推送消息:" + message, session);
  70. }
  71.  
  72. @OnError
  73. public void onError(Session session, Throwable error) {
  74. log.error("发生错误");
  75. error.printStackTrace();
  76. }
  77.  
  78. /**
  79. * 服务端发送消息给客户端
  80. */
  81. private static void sendMessage(String message, Session toSession) {
  82. try {
  83. log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
  84. toSession.getBasicRemote().sendText(message);
  85. } catch (Exception e) {
  86. log.error("服务端发送消息给客户端失败:{}", e);
  87. }
  88. }
  89.  
  90. public static AjaxResult sendMessage(String message, String sessionId){
  91. Session session = SESSION_CACHE.getIfPresent(sessionId);
  92. if(Objects.isNull(session)){
  93. return AjaxResult.error("token已失效");
  94. }
  95. sendMessage(message,session);
  96. return AjaxResult.success();
  97. }
  98.  
  99. public static AjaxResult sendBroadcast(String message){
  100. long size = SESSION_CACHE.size();
  101. if(size <=0){
  102. return AjaxResult.error("当前没有在线客户端,无法推送消息");
  103. }
  104. ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
  105. Set<String> keys = sessionConcurrentMap.keySet();
  106. for (String key : keys) {
  107. Session session = SESSION_CACHE.getIfPresent(key);
  108. DataTypePushWebSocket.sendMessage(message,session);
  109. }
  110.  
  111. return AjaxResult.success();
  112.  
  113. }
  114.  
  115. }

至此websocket服务端代码已经完成。

stomp协议

前端代码.这个是在某个vue工程中写的js,各位大佬自己动手改改即可。其中Settings.wsPath是后端定义的ws地址例如ws://localhost:9003/ws

  1. import Stomp from 'stompjs'
  2. import Settings from '@/settings.js'
  3.  
  4. export default {
  5. // 是否启用日志 默认启用
  6. debug:true,
  7. // 客户端连接信息
  8. stompClient:{},
  9. // 初始化
  10. init(callBack){
  11. this.stompClient = Stomp.client(Settings.wsPath)
  12. this.stompClient.hasDebug = this.debug
  13. this.stompClient.connect({},suce =>{
  14. this.console("连接成功,信息如下 ↓")
  15. this.console(this.stompClient)
  16. if(callBack){
  17. callBack()
  18. }
  19. },err => {
  20. if(err) {
  21. this.console("连接失败,信息如下 ↓")
  22. this.console(err)
  23. }
  24. })
  25. },
  26. // 订阅
  27. sub(address,callBack){
  28. if(!this.stompClient.connected){
  29. this.console("没有连接,无法订阅")
  30. return
  31. }
  32. // 生成 id
  33. let timestamp= new Date().getTime() + address
  34. this.console("订阅成功 -> "+address)
  35. this.stompClient.subscribe(address,message => {
  36. this.console(address+" 订阅消息通知,信息如下 ↓")
  37. this.console(message)
  38. let data = message.body
  39. callBack(data)
  40. },{
  41. id: timestamp
  42. })
  43. },
  44. unSub(address){
  45. if(!this.stompClient.connected){
  46. this.console("没有连接,无法取消订阅 -> "+address)
  47. return
  48. }
  49. let id = ""
  50. for(let item in this.stompClient.subscriptions){
  51. if(item.endsWith(address)){
  52. id = item
  53. break
  54. }
  55. }
  56. this.stompClient.unsubscribe(id)
  57. this.console("取消订阅成功 -> id:"+ id + " address:"+address)
  58. },
  59. // 断开连接
  60. disconnect(callBack){
  61. if(!this.stompClient.connected){
  62. this.console("没有连接,无法断开连接")
  63. return
  64. }
  65. this.stompClient.disconnect(() =>{
  66. console.log("断开成功")
  67. if(callBack){
  68. callBack()
  69. }
  70. })
  71. },
  72. // 单位 秒
  73. reconnect(time){
  74. setInterval(() =>{
  75. if(!this.stompClient.connected){
  76. this.console("重新连接中...")
  77. this.init()
  78. }
  79. },time * 1000)
  80. },
  81. console(msg){
  82. if(this.debug){
  83. console.log(msg)
  84. }
  85. },
  86. // 向订阅发送消息
  87. send(address,msg) {
  88. this.stompClient.send(address,{},msg)
  89. }
  90. }

后端stomp config,里面都有注释,写的很详细,并且我加入了和前端的心跳ping pong。

  1. package com.cn.scott.config;
  2.  
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.messaging.simp.config.MessageBrokerRegistry;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  6. import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
  7. import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
  8. import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
  9.  
  10. /**
  11. * @ClassName: WebSocketStompConfig
  12. * @Author: scott
  13. * @Date: 2021/7/8
  14. **/
  15. @Configuration
  16. @EnableWebSocketMessageBroker
  17. public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
  18.  
  19. private static long HEART_BEAT=10000;
  20.  
  21. @Override
  22. public void registerStompEndpoints(StompEndpointRegistry registry) {
  23. //允许使用socketJs方式访问,访问点为webSocket,允许跨域
  24. //在网页上我们就可以通过这个链接
  25. //ws://127.0.0.1:port/ws来和服务器的WebSocket连接
  26. registry.addEndpoint("/ws").setAllowedOrigins("*");
  27. }
  28.  
  29. @Override
  30. public void configureMessageBroker(MessageBrokerRegistry registry) {
  31. ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
  32. te.setPoolSize(1);
  33. te.setThreadNamePrefix("wss-heartbeat-thread-");
  34. te.initialize();
  35. //基于内存的STOMP消息代理来代替mq的消息代理
  36. //订阅Broker名称,/user代表点对点即发指定用户,/topic代表发布广播即群发
  37. //setHeartbeatValue 设置心跳及心跳时间
  38. registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
  39. //点对点使用的订阅前缀,不设置的话,默认也是/user/
  40. registry.setUserDestinationPrefix("/user/");
  41. }
  42. }

后端stomp协议接受、订阅等动作通知

  1. package com.cn.scott.ws;
  2.  
  3. import com.alibaba.fastjson.JSON;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.messaging.handler.annotation.DestinationVariable;
  6. import org.springframework.messaging.handler.annotation.MessageMapping;
  7. import org.springframework.messaging.simp.SimpMessagingTemplate;
  8. import org.springframework.messaging.simp.annotation.SubscribeMapping;
  9. import org.springframework.web.bind.annotation.RestController;
  10.  
  11. /**
  12. * @ClassName StompSocketHandler
  13. * @Author scott
  14. * @Date 2021/6/30
  15. * @Version V1.0
  16. **/
  17. @RestController
  18. public class StompSocketHandler {
  19.  
  20. @Autowired
  21. private SimpMessagingTemplate simpMessagingTemplate;
  22.  
  23. /**
  24. * @MethodName: subscribeMapping
  25. * @Description: 订阅成功通知
  26. * @Param: [id]
  27. * @Return: void
  28. * @Author: scott
  29. * @Date: 2021/6/30
  30. **/
  31. @SubscribeMapping("/user/{id}/listener")
  32. public void subscribeMapping(@DestinationVariable("id") final long id) {
  33. System.out.println(">>>>>>用户:"+id +",已订阅");
  34. SubscribeMsg param = new SubscribeMsg(id,String.format("用户【%s】已订阅成功", id));
  35. sendToUser(param);
  36. }
  37.  
  38. /**
  39. * @MethodName: test
  40. * @Description: 接收订阅topic消息
  41. * @Param: [id, msg]
  42. * @Return: void
  43. * @Author: scott
  44. * @Date: 2021/6/30
  45. **/
  46. @MessageMapping(value = "/user/{id}/listener")
  47. public void UserSubListener(@DestinationVariable long id, String msg) {
  48. System.out.println("收到客户端:" +id+",的消息");
  49. SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用户【%s】发送消息【%s】", id,msg));
  50. sendToUser(param);
  51. }
  52.  
  53. @GetMapping("/refresh/{userId}")
  54. public void refresh(@PathVariable Long userId, String msg) {
  55. StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服务端向用户【%s】发送消息【%s】", userId,msg));
  56. sendToUser(param);
  57. }
  58.  
  59. /**
  60. * @MethodName: sendToUser
  61. * @Description: 推送消息给订阅用户
  62. * @Param: [userId]
  63. * @Return: void
  64. * @Author: scott
  65. * @Date: 2021/6/30
  66. **/
  67. public void sendToUser(SubscribeMsg screenChangeMsg){
  68. //这里可以控制权限等。。。
  69. simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
  70. }
  71.  
  72. /**
  73. * @MethodName: sendBroadCast
  74. * @Description: 发送广播,需要用户事先订阅广播
  75. * @Param: [topic, msg]
  76. * @Return: void
  77. * @Author: scott
  78. * @Date: 2021/6/30
  79. **/
  80. public void sendBroadCast(String topic,String msg){
  81. simpMessagingTemplate.convertAndSend(topic,msg);
  82. }
  83.  
  84. /**
  85. * @ClassName: SubMsg
  86. * @Author: scott
  87. * @Date: 2021/6/30
  88. **/
  89. public static class SubscribeMsg {
  90. private Long userId;
  91. private String msg;
  92. public SubscribeMsg(Long UserId, String msg){
  93. this.userId = UserId;
  94. this.msg = msg;
  95. }
  96. public Long getUserId() {
  97. return userId;
  98. }
  99. public String getMsg() {
  100. return msg;
  101. }
  102. }
  103. }

连接展示

建立连接成功,这里可以看出是基于websocket协议

Springboot Websocket Stomp 消息订阅推送

连接信息

Springboot Websocket Stomp 消息订阅推送

ping pong

Springboot Websocket Stomp 消息订阅推送

调用接口向订阅用户1发送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客户端控制台查看已经收到了消息。这个时候不同用户通过自己的userId可以区分订阅的主题,可以做到通过userId精准的往客户端推送消息。

Springboot Websocket Stomp 消息订阅推送

还记得我们在后端配置的时候还指定了广播的订阅主题/topic,这时我们前端通过js只要订阅了这个主题,那么后端在像这个主题推送消息时,所有订阅的客户端都能收到,感兴趣的小伙伴可以自己试试,api我都写好了。

Springboot Websocket Stomp 消息订阅推送

至此,实战完毕,喜欢的小伙伴麻烦关注加点赞。

springboot + stomp后端源码地址:https://gitee.com/ErGouGeSiBaKe/stomp-server

到此这篇关于Springboot Websocket Stomp 消息订阅推送的文章就介绍到这了,更多相关Springboot Websocket Stomp 消息订阅推送内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/u010786653/article/details/118578178

延伸 · 阅读

精彩推荐
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    这篇文章主要介绍了Java使用SAX解析xml的示例,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java8中Stream使用的一个注意事项

    Java8中Stream使用的一个注意事项

    最近在工作中发现了对于集合操作转换的神器,java8新特性 stream,但在使用中遇到了一个非常重要的注意点,所以这篇文章主要给大家介绍了关于Java8中S...

    阿杜7472021-02-04
  • Java教程xml与Java对象的转换详解

    xml与Java对象的转换详解

    这篇文章主要介绍了xml与Java对象的转换详解的相关资料,需要的朋友可以参考下...

    Java教程网2942020-09-17
  • Java教程小米推送Java代码

    小米推送Java代码

    今天小编就为大家分享一篇关于小米推送Java代码,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    富贵稳中求8032021-07-12
  • Java教程20个非常实用的Java程序代码片段

    20个非常实用的Java程序代码片段

    这篇文章主要为大家分享了20个非常实用的Java程序片段,对java开发项目有所帮助,感兴趣的小伙伴们可以参考一下 ...

    lijiao5352020-04-06
  • Java教程Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java实现抢红包功能

    Java实现抢红包功能

    这篇文章主要为大家详细介绍了Java实现抢红包功能,采用多线程模拟多人同时抢红包,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙...

    littleschemer13532021-05-16
  • Java教程升级IDEA后Lombok不能使用的解决方法

    升级IDEA后Lombok不能使用的解决方法

    最近看到提示IDEA提示升级,寻思已经有好久没有升过级了。升级完毕重启之后,突然发现好多错误,本文就来介绍一下如何解决,感兴趣的可以了解一下...

    程序猿DD9332021-10-08