服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Storm框架整合springboot的方法

Storm框架整合springboot的方法

2021-06-10 15:35本拉邓 Java教程

Storm框架中的每个Spout和Bolt都相当于独立的应用,Strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。这篇文章主要介绍了Storm框架整合springboot的方法,需要的朋友可以参考下

storm:最火的流式处理框架

伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了。再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀。其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理,而且大多是在夜深人静之时做的,那么你今天白天做的事情当然要明天才能反映出来啦。

•实现一个实时计算系统

全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。否则最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。先抛开s4,storm,puma这些系统不谈,我们首先来看一下,如果让我们自己设计一个实时计算系统,我们要解决哪些问题。

1.低延迟。都说了是实时计算系统了,延迟是一定要低的。

2.高性能。性能不高就是浪费机器,浪费机器是要受批评的哦。

3.分布式。系统都是为应用场景而生的,如果你的应用场景、你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。我们所说的是单机搞不定的情况。

4.可扩展。伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。

5.容错。这是分布式系统中通用问题。一个节点挂了不能影响我的应用。

好,如果仅仅需要解决这5个问题,可能会有无数种方案,而且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工作进程就ok啦。我们再继续往下看。

1.容易在上面开发应用程序。亲,你设计的系统需要应用程序开发人员考虑各个处理组件的分布、消息的传递吗?如果是,那有点麻烦啊,开发人员可能会用不好,也不会想去用。

2.消息不丢失。用户发布的一个宝贝消息不能在实时处理的时候给丢了,对吧?更严格一点,如果是一个精确数据统计的应用,那么它处理的消息要不多不少才行。这个要求有点高哦。

诞 生

 在2011年storm开源之前,由于hadoop的火红,整个业界都在喋喋不休地谈论大数据。hadoop的高吞吐,海量数据处理的能力使得人们可以方便地处理海量数据。但是,hadoop的缺点也和它的优点同样鲜明——延迟大,响应缓慢,运维复杂。

有需求也就有创造,在hadoop基本奠定了大数据霸主地位的时候,很多的开源项目都是以弥补hadoop的实时性为目标而被创造出来。而在这个节骨眼上storm横空出世了。

storm带着流式计算的标签华丽丽滴出场了,看看它的一些卖点:

•分布式系统:可横向拓展,现在的项目不带个分布式特性都不好意思开源。

•运维简单:storm的部署的确简单。虽然没有mongodb的解压即用那么简单,但是它也就是多安装两个依赖库而已。

•高度容错:模块都是无状态的,随时宕机重启。

•无数据丢失:storm创新性提出的ack消息追踪框架和复杂的事务性处理,能够满足很多级别的数据处理需求。不过,越高的数据处理需求,性能下降越严重。

•多语言:实际上,storm的多语言更像是临时添加上去似的。因为,你的提交部分还是要使用java实现。

下面介绍下storm框架整合springboot的方法

我们知道storm本身是一个独立运行的分布式流式数据处理框架,springboot也是一个独立运行的web框架。那么如何在strom框架中集成springboot使得我们能够在storm开发中运用spring的ioc容器及其他如spring jpa等功能呢?我们先来了解以下概念:

•storm主要的三个component:topology、spout、bolt。topology作为主进程控制着spout、bolt线程的运行,他们相当于独立运行的容器分布于storm集群中的各个机器节点。

•springapplication:是配置spring应用上下文的起点。通过调用springapplication.run()方法它将创建applicationcontext实例,这是我们能够使用ioc容器的主要beanfactory。之后spring将会加载所有单例模式的beans,并启动后台运行的commandlinerunner beans等。

•applicationcontextaware:这是我们能够在普通java类中调用spring容器里的beans的关键接口。

Storm框架整合springboot的方法

实现原理

storm框架中的每个spout和bolt都相当于独立的应用,strom在启动spout和bolt时提供了一个open方法(spout)和prepare方法(bolt)。我们可以把初始化spring应用的操作放在这里,这样可以保证每个spout/bolt应用在后续执行过程中都能获取到spring的applicationcontext,有了applicationcontext实例对象,spring的所有功能就都能用上了。

•spout.open方法实现

?
1
2
3
4
5
6
7
8
9
@override
public void open(map map, topologycontext topologycontext, spoutoutputcollector spoutoutputcollector) {
 //启动springboot应用
 springstormapplication.run();
 
 this.map = map;
 this.topologycontext = topologycontext;
 this.spoutoutputcollector = spoutoutputcollector;
}

•bolt.prepare方法实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@override
public void prepare(map map, topologycontext topologycontext, outputcollector outputcollector) {
 //启动springboot应用
 springstormapplication.run();
 
 this.map = map;
 this.topologycontext = topologycontext;
 this.outputcollector = outputcollector;
}
•springstormapplication启动类
@springbootapplication
@componentscan(value = "com.xxx.storm")
public class springstormapplication {
 /**
  * 非工程启动入口,所以不用main方法
  * @param args
  */
 public static void run(string ...args) {
  springapplication app = new springapplication(springstormapplication.class);
  //我们并不需要web servlet功能,所以设置为webapplicationtype.none
  app.setwebapplicationtype(webapplicationtype.none);
  //忽略掉banner输出
  app.setbannermode(banner.mode.off);
  //忽略spring启动信息日志
  app.setlogstartupinfo(false);
  app.run(args);
 }
}

与我们传统的springboot应用启动入口稍微有点区别,主要禁用了web功能,看下正常的启动方式:

?
1
2
3
4
5
6
7
@springbootapplication
@componentscan(value = "com.xxx.web")
public class platformapplication {
 public static void main(string[] args) {
  springapplication.run(platformapplication.class, args);
 }
}

•在spout/bolt中调用了springstormapplication.run方法后,我们还需要能够拿到applicationcontext容器对象,这时候我们还需要实现applicationcontextaware接口,写个工具类beanutils:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@component
public class beanutils implements applicationcontextaware {
 private static applicationcontext applicationcontext = null;
 @override
 public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
  if (beanutils.applicationcontext == null) {
   beanutils.applicationcontext = applicationcontext;
  }
 }
 public static applicationcontext getapplicationcontext() {
  return applicationcontext;
 }
 public static object getbean(string name) {
  return getapplicationcontext().getbean(name);
 }
 public static <t> t getbean(class<t> clazz) {
  return getapplicationcontext().getbean(clazz);
 }
 public static <t> t getbean(string name, class<t> clazz) {
  return getapplicationcontext().getbean(name, clazz);
 }
}

通过@component注解使得spring在启动时能够扫描到该bean,因为beanutils实现了applicationcontextaware接口,spring会在启动成功时自动调用beanutils.setapplicationcontext方法,将applicationcontext对象保存到工具类的静态变量中,之后我们就可以使用beanutils.getbean()去获取spring容器中的bean了。

写个简单例子

•在filterbolt的execute方法中获取spring bean

?
1
2
3
4
5
@override
public void execute(tuple tuple) {
 filterservice filterservice = (filterservice) beanutils.getbean("filterservice");
 filterservice.deleteall();
}

•定义filterservice类,这时候我们就可以使用spring的相关注解,自动注入,spring jpa等功能了。

?
1
2
3
4
5
6
7
8
9
@service("filterservice")
public class filterservice {
 @autowired
 userrepository userrepository;
 
 public void deleteall() {
  userrepository.deleteall();
 }
}

将storm应用作为springboot工程的一个子模块

工程主目录的pom文件还是springboot相关的依赖,在storm子模块中引入storm依赖,这时候启动strom的topology应用会有一个日志包依赖冲突。

?
1
2
3
4
5
slf4j: class path contains multiple slf4j bindings.
slf4j: found binding in [jar:file:/applications/intellij%20idea.app/contents/bin/~/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.11.1/log4j-slf4j-impl-2.11.1.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: found binding in [jar:file:/applications/intellij%20idea.app/contents/bin/~/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/staticloggerbinder.class]
slf4j: see http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
slf4j: actual binding is of type [org.apache.logging.slf4j.log4jloggerfactory]

我们需要在storm子模块的pom文件中重写org.springframework.boot:spring-boot-starter包依赖,将springboot的相关日志包排除掉,如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<dependency>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter</artifactid>
 <exclusions>
  <exclusion>
   <groupid>org.apache.logging.log4j</groupid>
   <artifactid>log4j-to-slf4j2</artifactid>
  </exclusion>
  <exclusion>
   <groupid>ch.qos.logback</groupid>
   <artifactid>logback-classic2</artifactid>
  </exclusion>
 </exclusions>
</dependency>

总结

以上所述是小编给大家介绍的storm框架整合springboot的方法,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!

原文链接:https://www.cnblogs.com/gouyg/p/storm-springboot.html

延伸 · 阅读

精彩推荐