1.添加依赖
在idea项目的pom.xml中添加依赖。
1
2
3
4
5
6
|
<!--spark sql依赖,注意版本号--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2. 12 </artifactId> <version> 3.0 . 0 </version> </dependency> |
2.案例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
package com.zf.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Spark01_SparkSql_Basic { def main(args: Array[String]): Unit = { //创建上下文环境配置对象 val sparkConf = new SparkConf().setMaster( "local[*]" ).setAppName( "sparkSql" ) //创建 SparkSession 对象 val spark = SparkSession.builder().config(sparkConf).getOrCreate() // DataFrame val df: DataFrame = spark.read.json( "datas/user.json" ) //df.show() // DataFrame => Sql //df.createOrReplaceTempView("user") //spark.sql("select * from user").show() //spark.sql("select age from user").show() //spark.sql("select avg(age) from user").show() //DataFrame => Dsl //如果涉及到转换操作,转换需要引入隐式转换规则,否则无法转换,比如使用$提取数据的值 //spark 不是包名,是上下文环境对象名 import spark.implicits._ //df.select("age","username").show() //df.select($"age"+1).show() //df.select('age+1).show() // DataSet //val seq = Seq(1,2,3,4) //val ds: Dataset[Int] = seq.toDS() // ds.show() // RDD <=> DataFrame val rdd = spark.sparkContext.makeRDD(List(( 1 , "张三" , 10 ),( 2 , "李四" , 20 ))) val df1: DataFrame = rdd.toDF( "id" , "name" , "age" ) val rdd1: RDD[Row] = df1.rdd // DataFrame <=> DataSet val ds: Dataset[User] = df1.as[User] val df2: DataFrame = ds.toDF() // RDD <=> DataSet val ds1: Dataset[User] = rdd.map { case (id, name, age) => { User(id, name = name, age = age) } }.toDS() val rdd2: RDD[User] = ds1.rdd spark.stop() } case class User(id:Int,name:String,age:Int) } |
PS:下面看下在IDEA中开发Spark SQL程序
IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:
1
2
3
4
5
|
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2. 11 </artifactId> <version> 2.1 . 1 </version> </dependency> |
一、指定Schema格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.Row object Demo1 { def main(args: Array[String]): Unit = { //使用Spark Session 创建表 val spark = SparkSession.builder().master( "local" ).appName( "UnderstandSparkSession" ).getOrCreate() //从指定地址创建RDD val personRDD = spark.sparkContext.textFile( "D:\\tmp_files\\student.txt" ).map(_.split( "\t" )) //通过StructType声明Schema val schema = StructType( List( StructField( "id" , IntegerType), StructField( "name" , StringType), StructField( "age" , IntegerType))) //把RDD映射到rowRDD val rowRDD = personRDD.map(p=>Row(p( 0 ).toInt,p( 1 ),p( 2 ).toInt)) val personDF = spark.createDataFrame(rowRDD, schema) //注册表 personDF.createOrReplaceTempView( "t_person" ) //执行SQL val df = spark.sql( "select * from t_person order by age desc limit 4" ) df.show() spark.stop() } } |
二、使用case class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import org.apache.spark.sql.SparkSession //使用case class object Demo2 { def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder().master( "local" ).appName( "CaseClassDemo" ).getOrCreate() //从指定的文件中读取数据,生成对应的RDD val lineRDD = spark.sparkContext.textFile( "D:\\tmp_files\\student.txt" ).map(_.split( "\t" )) //将RDD和case class 关联 val studentRDD = lineRDD.map( x => Student(x( 0 ).toInt,x( 1 ),x( 2 ).toInt)) //生成 DataFrame,通过RDD 生成DF,导入隐式转换 import spark.sqlContext.implicits._ val studentDF = studentRDD.toDF //注册表 视图 studentDF.createOrReplaceTempView( "student" ) //执行SQL spark.sql( "select * from student" ).show() spark.stop() } } //case class 一定放在外面 case class Student(stuID:Int,stuName:String,stuAge:Int) |
三、把数据保存到数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.Row import java.util.Properties object Demo3 { def main(args: Array[String]): Unit = { //使用Spark Session 创建表 val spark = SparkSession.builder().master( "local" ).appName( "UnderstandSparkSession" ).getOrCreate() //从指定地址创建RDD val personRDD = spark.sparkContext.textFile( "D:\\tmp_files\\student.txt" ).map(_.split( "\t" )) //通过StructType声明Schema val schema = StructType( List( StructField( "id" , IntegerType), StructField( "name" , StringType), StructField( "age" , IntegerType))) //把RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p( 0 ).toInt, p( 1 ), p( 2 ).toInt)) val personDF = spark.createDataFrame(rowRDD, schema) //注册表 personDF.createOrReplaceTempView( "person" ) //执行SQL val df = spark.sql( "select * from person " ) //查看SqL内容 //df.show() //将结果保存到mysql中 val props = new Properties() props.setProperty( "user" , "root" ) props.setProperty( "password" , "123456" ) props.setProperty( "driver" , "com.mysql.jdbc.Driver" ) df.write.mode( "overwrite" ).jdbc( "jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8" , "student" , props) spark.close() } } |
以上内容转自:
https://blog.csdn.net/weixin_43520450/article/details/106093582
作者:故明所以
到此这篇关于IDEA 开发配置SparkSQL及简单使用案例代码的文章就介绍到这了,更多相关IDEA 开发 SparkSQL内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!