spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的。而每一个 step 又是由 read-process-write task或者 单个 task 组成。
1. "read-process-write" 处理,根据字面意思理解就可以:
- read 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据.
- process 就是处理读取的数据
- write 就是将处理过的数据写入到其他资源文件中去,可以是xml,csv,或者数据库.
比如:从csv文件中 读取数据,经过处理之后,保存到数据库. spring batch 提供了很多类去处理这方面的东西。
2.单个task, 也就是处理单个任务。比如在一个step 开始之前或者完成之后清除资源文件等.
3.许多个step 组成在一起,就组成了一个job. 所以他们之间的关系,就如同下面的描述:
一个 job = 很多steps
一个step = 一个read-process-write 或者 一个task.
同样一个job = step1 -->step2--step3 这样链表形式的组成.
spring batch 例子
考虑如下一个批处理的例子,看起来有点啰嗦,只是为了说明用途:
1. step1 : 从 a 文件夹中读取csv 文件,处理之后,写入到b文件夹中(read-process-write)
2. step2 : 从 b 文件夹中读取csv文件 ,处理之后, 存储到数据库中(read-process-write).
3. step3 : 删除b文件夹下的csv文件。(用到单个task)
4. step4 : 从数据库读取数据,处理之后,生成xml报表文件(read-process-write).
5. step5 : 读取xml报表,并发送email给管理员(用到单个task)
用spring batch 我们可以如下定义这个job:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
<job id= "abcjob" xmlns= "http://www.springframework.org/schema/batch" > <step id= "step1" next= "step2" > <tasklet> <chunk reader= "cvsitemreader" writer= "cvsitemwriter" processor= "itemprocesser" commit-interval= "1" /> </tasklet> </step> <step id= "step2" next= "step3" > <tasklet> <chunk reader= "cvsitemreader" writer= "databaseitemwriter" processor= "itemprocesser" commit-interval= "1" /> </tasklet> </step> <step id= "step3" next= "step4" > <tasklet ref= "filedeletingtasklet" /> </step> <step id= "step4" next= "step5" > <tasklet> <chunk reader= "databaseitemreader" writer= "xmlitemwriter" processor= "itemprocesser" commit-interval= "1" /> </tasklet> </step> <step id= "step5" > <tasklet ref= "sendingemailtasklet" /> </step> </job> |
整个 job 的执行是存储在数据库中的,所以即使是某一个step出错失败,也不需要全部从头开始执行这个job.下面是一个真正的入门教程例子.
采用 jar包如下:
spring-batch-2.2.3 以上版本,但是我在2.2.3版本中发现 org/springframework/batch/core/schema-mysql.sql 里面的的mysql 创建表的语句是有问题的,也就是少了“," 号导致的问题( not null, 后面几个创建表的语句not null 后面少了逗号),当然你可以自己修改后再执行,执行完毕后有如下几个表:
xstream-1.3.jar 必须的。
jettison-1.3.3.jar也是必须的, 否则会出现
java.lang.noclassdeffounderror: org/codehaus/jettison/mapped/mappedxmloutputfactory错误。
另外我用的spring 是 3.1 版本的,可以下载相关jar包,还有apache common 相关jar包就可以了。
mysql-connect-java-5.1.jar 连接mysql 数据库用的。
假设要将如下 csv 文件读取出来处理之后,写入到一个xml文件之中.
1
2
3
|
, "213,100" , 980 , "mkyong" , 29 / 7 / 2013 , "320,200" , 1080 , "staff 1" , 30 / 7 / 2013 , "342,197" , 1200 , "staff 2" , 31 / 7 / 2013 |
用 flatfileitemreader 去读取csv 文件, 用 itemprocessor 去处理数据,用 staxeventitemwriter 去写数据
job 的定义如下(job-hello-world.xml):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
<beans xmlns= "http://www.springframework.org/schema/beans" xmlns:batch= "http://www.springframework.org/schema/batch" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http: //www.springframework.org/schema/batch http: //www.springframework.org/schema/batch/spring-batch-2.2.xsd http: //www.springframework.org/schema/beans http: //www.springframework.org/schema/beans/spring-beans-3.1.xsd "> < import resource= "../config/context.xml" /> < import resource= "../config/database.xml" /> <bean id= "report" class = "yihaomen.model.report" scope= "prototype" /> <bean id= "itemprocessor" class = "yihaomen.customitemprocessor" /> <batch:job id= "helloworldjob" > <batch:step id= "step1" > <batch:tasklet> <batch:chunk reader= "cvsfileitemreader" writer= "xmlitemwriter" processor= "itemprocessor" commit-interval= "10" > </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <bean id= "cvsfileitemreader" class = "org.springframework.batch.item.file.flatfileitemreader" > <property name= "resource" value= "classpath:cvs/input/report.csv" /> <property name= "linemapper" > <bean class = "org.springframework.batch.item.file.mapping.defaultlinemapper" > <property name= "linetokenizer" > <bean class = "org.springframework.batch.item.file.transform.delimitedlinetokenizer" > <property name= "names" value= "id,sales,qty,staffname,date" /> </bean> </property> <property name= "fieldsetmapper" > <bean class = "yihaomen.reportfieldsetmapper" /> <!-- if no data type conversion, use beanwrapperfieldsetmapper to map by name <bean class = "org.springframework.batch.item.file.mapping.beanwrapperfieldsetmapper" > <property name= "prototypebeanname" value= "report" /> </bean> --> </property> </bean> </property> </bean> <bean id= "xmlitemwriter" class = "org.springframework.batch.item.xml.staxeventitemwriter" > <property name= "resource" value= "file:xml/outputs/report.xml" /> <property name= "marshaller" ref= "reportmarshaller" /> <property name= "roottagname" value= "report" /> </bean> <bean id= "reportmarshaller" class = "org.springframework.oxm.jaxb.jaxb2marshaller" > <property name= "classestobebound" > <list> <value>yihaomen.model.report</value> </list> </property> </bean> </beans> |
映射csv文件到 report 对象并写xml文件 (通过 jaxb annotations).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
package yihaomen.model; import java.math.bigdecimal; import java.util.date; import javax.xml.bind.annotation.xmlattribute; import javax.xml.bind.annotation.xmlelement; import javax.xml.bind.annotation.xmlrootelement; @xmlrootelement (name = "record" ) public class report { private int id; private bigdecimal sales; private int qty; private string staffname; private date date; @xmlattribute (name = "id" ) public int getid() { return id; } public void setid( int id) { this .id = id; } @xmlelement (name = "sales" ) public bigdecimal getsales() { return sales; } public void setsales(bigdecimal sales) { this .sales = sales; } @xmlelement (name = "qty" ) public int getqty() { return qty; } public void setqty( int qty) { this .qty = qty; } @xmlelement (name = "staffname" ) public string getstaffname() { return staffname; } public void setstaffname(string staffname) { this .staffname = staffname; } public date getdate() { return date; } public void setdate(date date) { this .date = date; } @override public string tostring() { return "report [id=" + id + ", sales=" + sales + ", qty=" + qty + ", staffname=" + staffname + "]" ; } } |
为了转换日期,用了自定义的 fieldsetmapper. 如果没有数据需要转换, beanwrapperfieldsetmapper 通过名称name 去自动映射值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package yihaomen; import java.text.parseexception; import java.text.simpledateformat; import org.springframework.batch.item.file.mapping.fieldsetmapper; import org.springframework.batch.item.file.transform.fieldset; import org.springframework.validation.bindexception; import yihaomen.model.report; public class reportfieldsetmapper implements fieldsetmapper<report> { private simpledateformat dateformat = new simpledateformat( "yyyy-mm-dd" ); @override public report mapfieldset(fieldset fieldset) throws bindexception { report report = new report(); report.setid(fieldset.readint( 0 )); report.setsales(fieldset.readbigdecimal( 1 )); report.setqty(fieldset.readint( 2 )); report.setstaffname(fieldset.readstring( 3 )); //default format yyyy-mm-dd //fieldset.readdate(4); string date = fieldset.readstring( 4 ); try { report.setdate(dateformat.parse(date)); } catch (parseexception e) { e.printstacktrace(); } return report; } } |
在写入数据之前调用itemprocessor 处理数据
1
2
3
4
5
6
7
8
9
10
11
|
package yihaomen; import org.springframework.batch.item.itemprocessor; import yihaomen.model.report; public class customitemprocessor implements itemprocessor<report, report> { @override public report process(report item) throws exception { system.out.println( "processing..." + item); return item; } } |
spring 配置文件和数据库配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
<beans xmlns= "http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation=" http: //www.springframework.org/schema/beans http: //www.springframework.org/schema/beans/spring-beans-3.2.xsd"> <!-- stored job-meta in memory --> <!-- <bean id= "jobrepository" class = "org.springframework.batch.core.repository.support.mapjobrepositoryfactorybean" > <property name= "transactionmanager" ref= "transactionmanager" /> </bean> --> <!-- stored job-meta in database --> <bean id= "jobrepository" class = "org.springframework.batch.core.repository.support.jobrepositoryfactorybean" > <property name= "datasource" ref= "datasource" /> <property name= "transactionmanager" ref= "transactionmanager" /> <property name= "databasetype" value= "mysql" /> </bean> <bean id= "transactionmanager" class = "org.springframework.batch.support.transaction.resourcelesstransactionmanager" /> <bean id= "joblauncher" class = "org.springframework.batch.core.launch.support.simplejoblauncher" > <property name= "jobrepository" ref= "jobrepository" /> </bean> </beans> |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
<beans xmlns= "http://www.springframework.org/schema/beans" xmlns:jdbc= "http://www.springframework.org/schema/jdbc" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http: //www.springframework.org/schema/beans http: //www.springframework.org/schema/beans/spring-beans-3.2.xsd http: //www.springframework.org/schema/jdbc http: //www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd"> <!-- connect to mysql database --> <bean id= "datasource" class = "org.springframework.jdbc.datasource.drivermanagerdatasource" > <property name= "driverclassname" value= "com.mysql.jdbc.driver" /> <property name= "url" value= "jdbc:mysql://localhost:3306/test" /> <property name= "username" value= "root" /> <property name= "password" value= "" /> </bean> <bean id= "transactionmanager" class = "org.springframework.batch.support.transaction.resourcelesstransactionmanager" /> <!-- create job-meta tables automatically --> <jdbc:initialize-database data-source= "datasource" > <jdbc:script location= "org/springframework/batch/core/schema-drop-mysql.sql" /> <jdbc:script location= "org/springframework/batch/core/schema-mysql.sql" /> </jdbc:initialize-database> </beans> |
运行程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package yihaomen; import org.springframework.batch.core.job; import org.springframework.batch.core.jobexecution; import org.springframework.batch.core.jobparameters; import org.springframework.batch.core.launch.joblauncher; import org.springframework.context.applicationcontext; import org.springframework.context.support.classpathxmlapplicationcontext; public class app { public static void main(string[] args) { string[] springconfig = { "spring/batch/jobs/job-hello-world.xml" }; applicationcontext context = new classpathxmlapplicationcontext(springconfig); joblauncher joblauncher = (joblauncher) context.getbean( "joblauncher" ); job job = (job) context.getbean( "helloworldjob" ); try { jobexecution execution = joblauncher.run(job, new jobparameters()); system.out.println( "exit status : " + execution.getstatus()); } catch (exception e) { e.printstacktrace(); } system.out.println( "done" ); } } |
运行结果 :
1
2
3
4
5
6
7
8
9
10
11
|
十二月 03 , 2013 8 : 56 : 24 下午 org.springframework.batch.core.launch.support.simplejoblauncher$ 1 run info: job: [flowjob: [name=helloworldjob]] launched with the following parameters: [{}] 十二月 03 , 2013 8 : 56 : 24 下午 org.springframework.batch.core.job.simplestephandler handlestep info: executing step: [step1] processing...report [id= 1001 , sales= 213100 , qty= 980 , staffname=yihaomen] processing...report [id= 1002 , sales= 320200 , qty= 1080 , staffname=staff 1 ] processing...report [id= 1003 , sales= 342197 , qty= 1200 , staffname=staff 2 ] 十二月 03 , 2013 8 : 56 : 25 下午 org.springframework.batch.core.launch.support.simplejoblauncher$ 1 run info: job: [flowjob: [name=helloworldjob]] completed with the following parameters: [{}] and the following status: [completed] exit status : completed done |
结果生成了output.xml 在你工程目录的 xml 目录下。
整个源代码,除去jar包之后下载:spring batch 入门教程下载
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/achuo/article/details/50982298