服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - 编程技术 - 彻底搞清 Flink 中的 Window 机制

彻底搞清 Flink 中的 Window 机制

2021-11-02 23:27大数据老哥 编程技术

在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。

彻底搞清 Flink 中的 Window 机制

一、 为什么需要Window

在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。

在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算

二、Window的分类

2.1 按照time和count分类

time-window:时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据

count-window:数量窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据

彻底搞清 Flink 中的 Window 机制

2.2 按照slide和size分类

窗口有两个重要的属性: 窗口大小size和滑动间隔slide,根据它们的大小关系可分为:

tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据

彻底搞清 Flink 中的 Window 机制

sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s数据

彻底搞清 Flink 中的 Window 机制

注意:当size

小结

按照上面窗口的分类方式进行组合,可以得出如下的窗口:

  • 基于时间的滚动窗口tumbling-time-window--用的较多
  • 基于时间的滑动窗口sliding-time-window--用的较多
  • 基于数量的滚动窗口tumbling-count-window--用的较少
  • 基于数量的滑动窗口sliding-count-window--用的较少

注意:Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算

三、WindowAPI

3.1 window和windowAll

彻底搞清 Flink 中的 Window 机制

使用keyby的流,应该使用window方法

未使用keyby的流,应该调用windowAll方法

区别:

Window算子:是可以设置并行度的

WindowAll 算子:并行度始终为1

3.2 WindowAssigner

Windows Assigner的作用是指定窗口的类型,定义如何将数据流分配到一个或者多个窗口,API中通过window (WindowsAssigner assigner)指定。在Flink中支持两种类型的窗口,一种是基于时间的窗口(TimeWindow),另一种是基于数量的窗口(countWindow)。窗口所表现出的类型特性取决于window assigner的定义。

彻底搞清 Flink 中的 Window 机制

Flink底层Window模型仅有TimeWindow以及GlobalWindow。

彻底搞清 Flink 中的 Window 机制

Flink提供了很多各种场景用的WindowAssigner:

彻底搞清 Flink 中的 Window 机制

如果需要自定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

3.3 evictor

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行

用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter两个方法。

Flink 提供了如下三种通用的 evictor:

CountEvictor 保留指定数量的元素

TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元

素,其中 max_ts 是窗口内时间戳的最大值。

DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删 除一个元素。

3.4 trigger

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,

如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

onEventTime() 当 event-time timer 被触发的时候会调用

onElement() 每次往 window 增加一个元素的时候都会触发

onMerge() 对两个 `rigger 的 state 进行 merge 操作

clear() window 销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResult, TriggerResult 有如下几种可能的选 择:

  • CONTINUE 不做任何事情
  • FIRE 触发 window
  • PURGE 清空整个 window 的元素并销毁窗口
  • PURGE 清空整个 window 的元素并销毁窗口

四、WindowAPI调用案例示例

4.1 基于时间的滚动和滑动窗口

测试数据

  1. 信号灯编号和通过该信号灯的车的数量
  2. 9,3
  3. 9,2
  4. 9,7
  5. 4,9
  6. 2,6
  7. 1,5
  8. 2,3
  9. 5,7
  10. 5,4

需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口

需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口

  1. packagecom.flink.source
  2. importorg.apache.flink.api.common.functions.MapFunction
  3. importorg.apache.flink.streaming.api.scala._
  4. importorg.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,TumblingProcessingTimeWindows}
  5. importorg.apache.flink.streaming.api.windowing.time.Time;
  6. /**
  7. *@Packagecom.flink.source
  8. *@File:WindowDemo_TimeWindow.java
  9. *@author大数据老哥
  10. *@date2021/10/2610:50
  11. *@versionV1.0
  12. */
  13. objectWindowDemo_TimeWindow{
  14. defmain(args:Array[String]):Unit={
  15. valenv=StreamExecutionEnvironment.getExecutionEnvironment
  16. valsocketData=env.socketTextStream("192.168.100.101",9999)
  17. valsocketMap=socketData.map(newMapFunction[String,CartInfo](){
  18. overridedefmap(t:String):CartInfo={
  19. valarr=t.split(",")
  20. CartInfo(arr(0),arr(1).toInt)
  21. }
  22. })
  23. //需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量
  24. valresult=socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count")
  25. //需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量
  26. valresult2=socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(10))).sum("count")
  27. result.print()
  28. result2.print()
  29. env.execute("winds")
  30. }
  31. }
  32. caseclassCartInfo(varsensorId:String,varcount:Int)

4.2 基于数量的滚动和滑动窗口

测试数据

  1. 信号灯编号和通过该信号灯的车的数量
  2. 9,3
  3. 9,2
  4. 9,7
  5. 4,9
  6. 2,6
  7. 1,5
  8. 2,3
  9. 5,7
  10. 5,4

需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口

需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口

  1. packagecom.flink.source
  2. importorg.apache.flink.api.common.functions.MapFunction
  3. importorg.apache.flink.streaming.api.scala._
  4. /**
  5. *@Packagecom.flink.source
  6. *@File:WindosDemoo_CountWindos.java
  7. *@author大数据老哥
  8. *@date2021/10/2614:04
  9. *@versionV1.0
  10. */
  11. objectWindowDemo_CountWindow{
  12. defmain(args:Array[String]):Unit={
  13. valenv=StreamExecutionEnvironment.getExecutionEnvironment
  14. valsocketData=env.socketTextStream("192.168.100.101",9999)
  15. valsocketMap=socketData.map(newMapFunction[String,CartInfo]{
  16. overridedefmap(t:String):CartInfo={
  17. valarr=t.split(",")
  18. CartInfo(arr(0),arr(1).toInt)
  19. }
  20. })
  21. //需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计
  22. valresult=socketMap.keyBy(_.sensorId).countWindow(5L).sum("count")
  23. //需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计
  24. valresult2=socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count")
  25. result.print("result")
  26. result2.print("result2")
  27. env.execute()
  28. }
  29. }
  30. caseclassCartInfo(varsensorId:String,varcount:Int)

case class CartInfo(var sensorId: String, var count: Int)

4.3 会话窗口

测试数据

  1. 信号灯编号和通过该信号灯的车的数量
  2. 9,3
  3. 9,2
  4. 9,7
  5. 4,9
  6. 2,6
  7. 1,5
  8. 2,3
  9. 5,7
  10. 5,4

设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算

  1. packagecom.flink.source
  2. importorg.apache.flink.api.common.functions.MapFunction
  3. importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
  4. importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment
  5. importorg.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
  6. importorg.apache.flink.streaming.api.windowing.time.Time
  7. /**
  8. *@Packagecom.flink.source
  9. *@File:WindowDemo_SessionWindow.java
  10. *@author大数据老哥
  11. *@date2021/11/116:10
  12. *@versionV1.0
  13. */
  14. objectWindowDemo_SessionWindow{
  15. defmain(args:Array[String]):Unit={
  16. valenv=StreamExecutionEnvironment.getExecutionEnvironment
  17. valsocketData=env.socketTextStream("192.168.100.101",9999)
  18. valsocketMap:SingleOutputStreamOperator[CartInfo]=socketData.map(newMapFunction[String,CartInfo](){
  19. overridedefmap(t:String):CartInfo={
  20. valarr=t.split(",")
  21. CartInfo(arr(0),arr(1).toInt)
  22. }
  23. })
  24. //设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算
  25. valresult=socketMap.keyBy(0)
  26. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  27. .sum("count")
  28. result.print()
  29. env.execute("winds")
  30. }
  31. }
  32. caseclassCartInfo(varsensorId:String,varcount:Int)

原文链接:https://mp.weixin.qq.com/s/GPIcjPQUT1xAKAG6fBmvtg

延伸 · 阅读

精彩推荐