服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 如何使用IDEA开发Spark SQL程序(一文搞懂)

如何使用IDEA开发Spark SQL程序(一文搞懂)

2021-11-12 14:18小哪吒的BD Java教程

Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。这篇文章主要介绍了如何使用IDEA开发Spark SQL程序(一文搞懂),需要的朋友可以参考下

前言

大家好,我是DJ丶小哪吒,我又来跟你们分享知识了。对软件开发有着浓厚的兴趣。喜欢与人分享知识。做博客的目的就是为了能与 他 人知识共享。由于水平有限。博客中难免会有一些错误。如有 纰漏 之处,欢迎大家在留言区指正。小编也会及时改正。

DJ丶小哪吒又来与各位分享知识了。今天我们不飙车,今天就静静的坐下来,我们来聊一聊关于sparkSQL。准备好茶水,听老朽与你娓娓道来。

Spark SQL是什么

Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

1、使用IDEA开发Spark SQL

Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:

  • 第1种:指定列名添加Schema
  • 第2种:通过StructType指定Schema
  • 第3种:编写样例类,利用反射机制推断Schema

 1.1、指定列名添加Schema

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
 
 
object CreateDFDS {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    val personDF: DataFrame = rowRDD.toDF("id","name","age")
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.2、通过StructType指定Schema

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 
 
object CreateDFDS2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    //import spark.implicits._
    val schema: StructType = StructType(Seq(
      StructField("id", IntegerType, true),//允许为空
      StructField("name", StringType, true),
      StructField("age", IntegerType, true))
    )
    val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.3、反射推断Schema–掌握

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
 
 
object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.4、花式查询

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
 
 
object QueryDemo {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    //=======================SQL方式查询=======================
    //0.注册表
    personDF.createOrReplaceTempView("t_person")
    //1.查询所有数据
    spark.sql("select * from t_person").show()
    //2.查询age+1
    spark.sql("select age,age+1 from t_person").show()
    //3.查询age最大的两人
    spark.sql("select name,age from t_person order by age desc limit 2").show()
    //4.查询各个年龄的人数
    spark.sql("select age,count(*) from t_person group by age").show()
    //5.查询年龄大于30的
    spark.sql("select * from t_person where age > 30").show()
 
    //=======================DSL方式查询=======================
    //1.查询所有数据
    personDF.select("name","age")
    //2.查询age+1
    personDF.select($"name",$"age" + 1)
    //3.查询age最大的两人
    personDF.sort($"age".desc).show(2)
    //4.查询各个年龄的人数
    personDF.groupBy("age").count().show()
    //5.查询年龄大于30的
    personDF.filter($"age" > 30).show()
 
    sc.stop()
    spark.stop()
  }
  }

1.5、 相互转化

RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作
2)使用DSL/SQL对表操作

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
object TransformDemo {
case class Person(id:Int,name:String,age:Int)
 
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    //=========================相互转换======================
    //1.RDD-->DF
    val personDF: DataFrame = personRDD.toDF
    //2.DF-->RDD
    val rdd: RDD[Row] = personDF.rdd
    //3.RDD-->DS
    val DS: Dataset[Person] = personRDD.toDS()
    //4.DS-->RDD
    val rdd2: RDD[Person] = DS.rdd
    //5.DF-->DS
    val DS2: Dataset[Person] = personDF.as[Person]
    //6.DS-->DF
    val DF: DataFrame = DS2.toDF()
 
    sc.stop()
    spark.stop()
  }
  }

1.6、Spark SQL完成WordCount(案例)

1.6.1、SQL风格

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 
 
object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.createOrReplaceTempView("t_word")
    val sql =
      """
        |select value ,count(value) as count
        |from t_word
        |group by value
        |order by count desc
      """.stripMargin
    spark.sql(sql).show()
 
    sc.stop()
    spark.stop()
  }
}

1.6.2、DQL风格

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.itcast.sql
 
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 
 
object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.groupBy("value").count().orderBy($"count".desc).show()
 
    sc.stop()
    spark.stop()
  }
}

好了,以上内容就到这里了。你学到了吗。

到此这篇关于如何使用IDEA开发Spark SQL程序(一文搞懂)的文章就介绍到这了,更多相关IDEA开发Spark SQL内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/mr_yang888/article/details/105820316

延伸 · 阅读

精彩推荐
  • Java教程详解Spring Boot实战之单元测试

    详解Spring Boot实战之单元测试

    本篇文章主要介绍了详解Spring Boot实战之单元测试,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    sun_t892602020-12-02
  • Java教程IntelliJ IDEA 如何彻底删除项目的步骤

    IntelliJ IDEA 如何彻底删除项目的步骤

    本篇文章主要介绍了IntelliJ IDEA 如何彻底删除项目的步骤,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    WJJPro16272021-02-24
  • Java教程java反射之获取类的信息方法(推荐)

    java反射之获取类的信息方法(推荐)

    下面小编就为大家带来一篇java反射之获取类的信息方法(推荐)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    Java之家5132020-11-06
  • Java教程Java中的随机数详解

    Java中的随机数详解

    这篇文章主要介绍了Java中的随机数,需要的朋友可以参考下 ...

    Java教程网5452019-11-08
  • Java教程SpringCloud 如何提取公共配置

    SpringCloud 如何提取公共配置

    这篇文章主要介绍了SpringCloud 提取公共配置的操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教...

    指尖凉10152021-10-14
  • Java教程java HashMap内部实现原理详解

    java HashMap内部实现原理详解

    这篇文章主要介绍了java HashMap内部实现原理详解的相关资料,需要的朋友可以参考下 ...

    u0129269245082020-08-02
  • Java教程Spring依赖注入的三种方式小结

    Spring依赖注入的三种方式小结

    本篇文章主要介绍了Spring依赖注入的三种方式小结,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    飞扬小初2552020-12-16
  • Java教程Java 继承原理与用法实例分析

    Java 继承原理与用法实例分析

    这篇文章主要介绍了Java 继承原理与用法,结合实例形式分析了java面向对象程序设计中继承的概念、原理、用法及操作注意事项,需要的朋友可以参考下...

    longzhoufeng4082019-06-26