1.介绍
当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。
2.示例
2.1 pom.xml
本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
<? xml version = "1.0" encoding = "UTF-8" ?> < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > < modelVersion >4.0.0</ modelVersion > < groupId >com.hfcsbc.estl</ groupId > < artifactId >es-etl</ artifactId > < version >0.0.1-SNAPSHOT</ version > < packaging >jar</ packaging > < name >es-etl</ name > < description >Demo project for Spring Boot</ description > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.0.0.M7</ version > < relativePath /> <!-- lookup parent from repository --> </ parent > < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > < project.reporting.outputEncoding >UTF-8</ project.reporting.outputEncoding > < java.version >1.8</ java.version > </ properties > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-data-jpa</ artifactId > </ dependency > < dependency > < groupId >org.postgresql</ groupId > < artifactId >postgresql</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-batch</ artifactId > </ dependency > < dependency > < groupId >com.github.vanroy</ groupId > < artifactId >spring-boot-starter-data-jest</ artifactId > < version >3.0.0.RELEASE</ version > </ dependency > < dependency > < groupId >io.searchbox</ groupId > < artifactId >jest</ artifactId > < version >5.3.2</ version > </ dependency > < dependency > < groupId >org.projectlombok</ groupId > < artifactId >lombok</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-test</ artifactId > < scope >test</ scope > </ dependency > </ dependencies > < build > < plugins > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build > < repositories > < repository > < id >spring-snapshots</ id > < name >Spring Snapshots</ name > < url >https://repo.spring.io/snapshot</ url > < snapshots > < enabled >true</ enabled > </ snapshots > </ repository > < repository > < id >spring-milestones</ id > < name >Spring Milestones</ name > < url >https://repo.spring.io/milestone</ url > < snapshots > < enabled >false</ enabled > </ snapshots > </ repository > </ repositories > < pluginRepositories > < pluginRepository > < id >spring-snapshots</ id > < name >Spring Snapshots</ name > < url >https://repo.spring.io/snapshot</ url > < snapshots > < enabled >true</ enabled > </ snapshots > </ pluginRepository > < pluginRepository > < id >spring-milestones</ id > < name >Spring Milestones</ name > < url >https://repo.spring.io/milestone</ url > < snapshots > < enabled >false</ enabled > </ snapshots > </ pluginRepository > </ pluginRepositories > </ project > |
2.2 实体类及repository
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package com.hfcsbc.esetl.domain; import lombok.Data; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.OneToOne; /** * Create by pengchao on 2018/2/23 */ @Document (indexName = "person" , type = "person" , shards = 1 , replicas = 0 , refreshInterval = "-1" ) @Entity @Data public class Person { @Id private Long id; private String name; @OneToOne @Field (type = FieldType.Nested) private Address address; } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
package com.hfcsbc.esetl.domain; import lombok.Data; import javax.persistence.Entity; import javax.persistence.Id; /** * Create by pengchao on 2018/2/23 */ @Entity @Data public class Address { @Id private Long id; private String name; } |
1
2
3
4
5
6
7
8
|
package com.hfcsbc.esetl.repository.jpa; import com.hfcsbc.esetl.domain.Person; import org.springframework.data.jpa.repository.JpaRepository; /** * Create by pengchao on 2018/2/23 */ public interface PersonRepository extends JpaRepository<Person, Long> { } |
1
2
3
4
5
6
7
8
|
package com.hfcsbc.esetl.repository.es; import com.hfcsbc.esetl.domain.Person; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; /** * Create by pengchao on 2018/2/23 */ public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> { } |
2.3 配置elasticsearchItemWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.hfcsbc.esetl.itemWriter; import com.hfcsbc.esetl.repository.es.EsPersonRepository; import com.hfcsbc.esetl.domain.Person; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.item.ItemWriter; import java.util.List; /** * Create by pengchao on 2018/2/23 */ public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener { private EsPersonRepository personRepository; public ElasticsearchItemWriter(EsPersonRepository personRepository) { this .personRepository = personRepository; } @Override public void beforeWrite(List<? extends Person> items) { } @Override public void afterWrite(List<? extends Person> items) { } @Override public void onWriteError(Exception exception, List<? extends Person> items) { } @Override public void beforeStep(StepExecution stepExecution) { } @Override public ExitStatus afterStep(StepExecution stepExecution) { return null ; } @Override public void write(List<? extends Person> items) throws Exception { //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引 personRepository.saveAll(items); } } |
2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
package com.hfcsbc.esetl.itemReader; import org.springframework.batch.item.data.AbstractPaginatedDataItemReader; import org.springframework.beans.factory.InitializingBean; import org.springframework.data.elasticsearch.core.ElasticsearchOperations; import org.springframework.data.elasticsearch.core.query.SearchQuery; import java.util.Iterator; /** * Create by pengchao on 2018/2/24 */ public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean { private final ElasticsearchOperations elasticsearchOperations; private final SearchQuery query; private final Class<? extends Person> targetType; public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) { this .elasticsearchOperations = elasticsearchOperations; this .query = query; this .targetType = targetType; } @Override protected Iterator<Person> doPageRead() { return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator(); } @Override public void afterPropertiesSet() throws Exception { } } |
2.5 配置spring batch需要的配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
package com.hfcsbc.esetl.config; import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter; import com.hfcsbc.esetl.repository.es.EsPersonRepository; import com.hfcsbc.esetl.domain.Person; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.database.JpaPagingItemReader; import org.springframework.batch.item.database.orm.JpaNativeQueryProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; import javax.persistence.EntityManagerFactory; import javax.sql.DataSource; /** * Create by pengchao on 2018/2/23 */ @Configuration @EnableBatchProcessing public class BatchConfig { @Autowired private EsPersonRepository personRepository; @Bean public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){ JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>(); String sqlQuery = "select * from person" ; try { JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>(); queryProvider.setSqlQuery(sqlQuery); queryProvider.setEntityClass(Person. class ); queryProvider.afterPropertiesSet(); reader.setEntityManagerFactory(entityManagerFactory); reader.setPageSize( 10000 ); reader.setQueryProvider(queryProvider); reader.afterPropertiesSet(); reader.setSaveState( true ); } catch (Exception e) { e.printStackTrace(); } return reader; } @Bean public ElasticsearchItemWriter itemWriter(){ return new ElasticsearchItemWriter(personRepository); } @Bean public Step step(StepBuilderFactory stepBuilderFactory, ItemReader itemReader, ItemWriter itemWriter){ return stepBuilderFactory .get( "step1" ) .chunk( 10000 ) .reader(itemReader) .writer(itemWriter) .build(); } @Bean public Job job(JobBuilderFactory jobBuilderFactory, Step step){ return jobBuilderFactory .get( "importJob" ) .incrementer( new RunIdIncrementer()) .flow(step) .end() .build(); } /** * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource * @param dataSource * @param manager * @return */ @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(manager); jobRepositoryFactoryBean.setDatabaseType( "postgres" ); try { return jobRepositoryFactoryBean.getObject(); } catch (Exception e) { e.printStackTrace(); } return null ; } } |
2.6配置数据库及es的连接地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
spring: redis: host: 192.168.1.222 data: jest: uri: http://192.168.1.222:9200 username: elastic password: changeme jpa: database: POSTGRESQL show-sql: true hibernate: ddl-auto: update datasource: platform: postgres url: jdbc:postgresql://192.168.1.222:5433/person username: hfcb password: hfcb driver-class-name: org.postgresql.Driver max-active: 2 spring.batch.initialize-schema: always |
2.7 配置入口类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package com.hfcsbc.esetl; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration; import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration; import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; @SpringBootApplication (exclude = {ElasticsearchAutoConfiguration. class , ElasticsearchDataAutoConfiguration. class }) @EnableElasticsearchRepositories (basePackages = "com.hfcsbc.esetl.repository" ) @EnableJpaRepositories (basePackages = "com.hfcsbc.esetl.repository.jpa" ) public class EsEtlApplication { public static void main(String[] args) { SpringApplication.run(EsEtlApplication. class , args); } } |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.wisely.top/2018/02/24/spring-batch-elasticsearch