本地jvm执行flink带web ui
使用
1
|
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); |
可以获取flink执行环境。但是本地jvm执行的时候是不带web ui的。有时候出于监控的考虑,需要带着监控页面查看。任务运行状况,可以使用下面方式获取flink本地执行环境,并带有web ui。
1
2
3
|
Configuration config = new Configuration(); config.setInteger(RestOptions.PORT, 9998 ); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); |
Flink 本地执行入门
一、maven依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
< properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > < flink.version >1.6.3</ flink.version > < java.version >1.8</ java.version > < scala.version >2.11.8</ scala.version > < hbase.version >1.2.4</ hbase.version > < scala.binary.version >2.11</ scala.binary.version > < maven.compiler.source >${java.version}</ maven.compiler.source > < maven.compiler.target >${java.version}</ maven.compiler.target > </ properties > < dependency > < groupId >org.apache.flink</ groupId > < artifactId >flink-clients_${scala.binary.version}</ artifactId > < version >${flink.version}</ version > </ dependency > |
二、本地执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; public class FlinkReadTextFile { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet<String> data = env.readTextFile( "file:///Users/***/Documents/test.txt" ); data.filter( new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return value.startsWith( "五芳斋美" ); } }) .writeAsText( "file:///Users/***/Documents/test01.txt" ); JobExecutionResult res = env.execute(); } } |
三、实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._ object SocketWindowWordCount { /** Main program method */ def main(args: Array[String]): Unit ={ // the port to connect to // val port: Int = try { // ParameterTool.fromArgs(args).getInt("port") // } catch { // case e: Exception => { // System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'") // return // } // } // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text = env.socketTextStream( "localhost" , 9000 , '\n' ) // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split( "\\s" ) } .map { w => WordWithCount(w, 1 ) } .keyBy( "word" ) .timeWindow(Time.seconds( 5 ), Time.seconds( 1 )) .sum( "count" ) // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism( 1 ) env.execute( "Socket Window WordCount" ) } // Data type for words with count case class WordWithCount(word: String, count: Long) } |
以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/Vector97/article/details/118182173