一:准备数据源
在项目下新建一个student.txt文件,里面的内容为:
1
2
3
4
|
1 ,zhangsan, 20 2 ,lisi, 21 3 ,wanger, 19 4 ,fangliu, 18 |
二:实现
java版:
1.首先新建一个student的bean对象,实现序列化和tostring()方法,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package com.cxd.sql; import java.io.serializable; @suppresswarnings ( "serial" ) public class student implements serializable { string sid; string sname; int sage; public string getsid() { return sid; } public void setsid(string sid) { this .sid = sid; } public string getsname() { return sname; } public void setsname(string sname) { this .sname = sname; } public int getsage() { return sage; } public void setsage( int sage) { this .sage = sage; } @override public string tostring() { return "student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]" ; } } |
2.转换,具体代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.cxd.sql; import java.util.arraylist; import org.apache.spark.sparkconf; import org.apache.spark.api.java.javardd; import org.apache.spark.sql.dataset; import org.apache.spark.sql.row; import org.apache.spark.sql.rowfactory; import org.apache.spark.sql.savemode; import org.apache.spark.sql.sparksession; import org.apache.spark.sql.types.datatypes; import org.apache.spark.sql.types.structfield; import org.apache.spark.sql.types.structtype; public class txttoparquetdemo { public static void main(string[] args) { sparkconf conf = new sparkconf().setappname( "txttoparquet" ).setmaster( "local" ); sparksession spark = sparksession.builder().config(conf).getorcreate(); reflecttransform(spark); //java反射 dynamictransform(spark); //动态转换 } /** * 通过java反射转换 * @param spark */ private static void reflecttransform(sparksession spark) { javardd<string> source = spark.read().textfile( "stuinfo.txt" ).javardd(); javardd<student> rowrdd = source.map(line -> { string parts[] = line.split( "," ); student stu = new student(); stu.setsid(parts[ 0 ]); stu.setsname(parts[ 1 ]); stu.setsage(integer.valueof(parts[ 2 ])); return stu; }); dataset<row> df = spark.createdataframe(rowrdd, student. class ); df.select( "sid" , "sname" , "sage" ). coalesce( 1 ).write().mode(savemode.append).parquet( "parquet.res" ); } /** * 动态转换 * @param spark */ private static void dynamictransform(sparksession spark) { javardd<string> source = spark.read().textfile( "stuinfo.txt" ).javardd(); javardd<row> rowrdd = source.map( line -> { string[] parts = line.split( "," ); string sid = parts[ 0 ]; string sname = parts[ 1 ]; int sage = integer.parseint(parts[ 2 ]); return rowfactory.create( sid, sname, sage ); }); arraylist<structfield> fields = new arraylist<structfield>(); structfield field = null ; field = datatypes.createstructfield( "sid" , datatypes.stringtype, true ); fields.add(field); field = datatypes.createstructfield( "sname" , datatypes.stringtype, true ); fields.add(field); field = datatypes.createstructfield( "sage" , datatypes.integertype, true ); fields.add(field); structtype schema = datatypes.createstructtype(fields); dataset<row> df = spark.createdataframe(rowrdd, schema); df.coalesce( 1 ).write().mode(savemode.append).parquet( "parquet.res1" ); } } |
scala版本:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
import org.apache.spark.sql.sparksession import org.apache.spark.sql.types.stringtype import org.apache.spark.sql.types.structfield import org.apache.spark.sql.types.structtype import org.apache.spark.sql.row import org.apache.spark.sql.types.integertype object rdd2dataset { case class student(id: int ,name:string,age: int ) def main(args:array[string]) { val spark=sparksession.builder().master( "local" ).appname( "rdd2dataset" ).getorcreate() import spark.implicits._ reflectcreate(spark) dynamiccreate(spark) } /** * 通过java反射转换 * @param spark */ private def reflectcreate(spark:sparksession):unit={ import spark.implicits._ val sturdd=spark.sparkcontext.textfile( "student2.txt" ) //todf()为隐式转换 val studf=sturdd.map(_.split( "," )).map(parts⇒student(parts( 0 ).trim.toint,parts( 1 ),parts( 2 ).trim.toint)).todf() //studf.select("id","name","age").write.text("result") //对写入文件指定列名 studf.printschema() studf.createorreplacetempview( "student" ) val namedf=spark.sql( "select name from student where age<20" ) //namedf.write.text("result") //将查询结果写入一个文件 namedf.show() } /** * 动态转换 * @param spark */ private def dynamiccreate(spark:sparksession):unit={ val sturdd=spark.sparkcontext.textfile( "student.txt" ) import spark.implicits._ val schemastring= "id,name,age" val fields=schemastring.split( "," ).map(fieldname => structfield(fieldname, stringtype, nullable = true )) val schema=structtype(fields) val rowrdd=sturdd.map(_.split( "," )).map(parts⇒row(parts( 0 ),parts( 1 ),parts( 2 ))) val studf=spark.createdataframe(rowrdd, schema) studf.printschema() val tmpview=studf.createorreplacetempview( "student" ) val namedf=spark.sql( "select name from student where age<20" ) //namedf.write.text("result") //将查询结果写入一个文件 namedf.show() } } |
注:
1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。
2.此代码不适用于spark2.0以前的版本。
以上这篇java和scala实现 spark rdd转换成dataframe的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u010592112/article/details/73730796