Spring Batch ItemReaders and ItemWriters Example
This article is a tutorial about the various Item Readers and Item Writers in Spring Batch. We will use Spring Boot to speed our development process.
Table Of Contents
1. Introduction
Spring Batch is a lightweight, scale-able and comprehensive batch framework to handle data at massive scale. It builds upon the spring framework to provide intuitive and easy configuration for executing batch applications. It provides reusable functions essential for processing large volumes of records, including cross-cutting concerns such as logging/tracing, transaction management, job processing statistics, job restart, skip and resource management.
Spring Batch has a layered architecture consisting of three components:
- Application – Contains custom code written by developers.
- Batch Core – Classes to launch and control batch job.
- Batch Infrastructure – Reusable code for common functionalities needed by core and Application.
Let us dive into parallel processing of spring batch with examples of item readers and item writers.
2. Technologies Used
- Java 1.8.101 (1.8.x will do fine)
- Gradle 4.4.1 (4.x will do fine)
- IntelliJ Idea (Any Java IDE would work)
- Rest will be part of the Gradle configuration.
3. Spring Batch Project
Spring Boot Starters provides more than 30 starters to ease the dependency management for your project. The easiest way to generate a Spring Boot project is via Spring starter tool with the steps below:
- Navigate to https://start.spring.io/.
- Select Gradle Project with Java and Spring Boot version 2.0.1.
- Add Batch and HSqlDB in the “search for dependencies”.
- Enter the group name as
com.jcg
and artifact assprbatch
. - Click the Generate Project button.
A Gradle Project will be generated. If you prefer Maven, use Maven instead of Gradle before generating the project. Import the project into your Java IDE.
3.1 Gradle File
We will look at the generated gradle file for our project. It has detailed configuration outlining the compile time and run time dependencies for our project.
build.gradle
buildscript { ext { springBootVersion = '2.0.1.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.jcg' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter-batch') compile('org.springframework.boot:spring-boot-starter-data-jpa') compile('org.springframework.boot:spring-boot-starter-jdbc') compile('org.mybatis.spring.boot:mybatis-spring-boot-starter:1.3.2') runtime('org.hsqldb:hsqldb') testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') }
- We have provided
Maven
as the repository for all our dependencies. - Spring Boot Batch Starter dependency is applied to enable batch nature in our project.
- HSQL DB is provided as runtime dependency to save spring batch job status in embedded mode. Spring batch needs to track the job execution, results in a reliable manner to survive across job restarts and abnormal terminations. To ensure this, generally they are stored in the database and we use Hsql DB in embedded mode for the same.
- HSQL DB is going to be used for demonstrating our database readers and writers
- Lines 33,34 represent the test configuration.
Below is the Person model which will be used as data structure in our application.
Person
package com.jcg.sprbatch.model; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; @Entity @XmlRootElement(name = "Person") public class Person { @Override public String toString() { return "Person{" + "lastName='" + lastName + '\'' + ", firstName='" + firstName + '\'' + '}'; } @Id @GeneratedValue private int id; @XmlElement(name = "LastName") public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } @XmlElement(name = "FirstName") public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } private String lastName; private String firstName; }
- Line 10 specifies the class Person is a
JPA Entity
while Line 11 specifies that it is Root Element of the xml. - Id value is primary key and auto generated by hibernate which is specified in lines 21-22.
- Line 25 and 34 specify that
FirstName
andLastName
are to be used as Xml tags when marshalling and unmarshalling by the Object Xml mapper.
We will cover couple of readers in combination with another variant of writer in detailed manner with examples in the next section.
4. Item Readers and Writers
4.1 FlatFileItemReader and JpaItemWriter
In this example, We will read from a csv file and write the records into our Hsql database. Below is the batch configuration to achieve the same.
FlatFile and JPA Configuration
package com.jcg.sprbatch.config; import com.jcg.sprbatch.model.Person; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.database.JdbcCursorItemReader; import org.springframework.batch.item.database.JpaItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.xml.StaxEventItemReader; import org.springframework.batch.item.xml.StaxEventItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.jdbc.core.BeanPropertyRowMapper; import org.springframework.oxm.jaxb.Jaxb2Marshaller; import javax.persistence.EntityManagerFactory; import javax.sql.DataSource; @Configuration @EnableBatchProcessing public class SpringBatchConfig { @Autowired EntityManagerFactory emf; @Autowired JobBuilderFactory jobBuilderFactory; @Autowired StepBuilderFactory stepBuilderFactory; @Autowired DataSource dataSource; @Bean @StepScope public FlatFileItemReader<Person> flatFileItemReader() { FlatFileItemReader<Person> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("people.csv")); reader.setLinesToSkip(1); DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>(); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames("firstName", "lastName"); BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>(); fieldSetMapper.setTargetType(Person.class); lineMapper.setFieldSetMapper(fieldSetMapper); lineMapper.setLineTokenizer(tokenizer); reader.setLineMapper(lineMapper); return reader; } @Bean public JpaItemWriter<Person> jpaItemWriter() { JpaItemWriter<Person> writer = new JpaItemWriter(); writer.setEntityManagerFactory(emf); return writer; } @Bean public Job flowJob() { return jobBuilderFactory.get("flowJob") .incrementer(new RunIdIncrementer()) .start(flatFileJpaWriterStep()) .build(); } }
- Lines 31-32 specify that this class is a configuration class and we enable batch processing for this project.
- In line 50-67, we configure a
FlatFileItemReader
as follows. - We provide the filename as
people.csv
and mention that it is available in our classpath. - In Line 53, we specify that the first line has to be skipped as the first line is header.
- We split line by comma, as it is a Comma Separated file.
- Tokenizer is utilized to split the line into tokens based on our delimiter.
- We map each of the tokens into a model class based on
BeanWrapperFieldSetMapper
. - Line 70 specifies
JpaItemWriter
which persists the person model into database. JpaItemWriter
uses the auto configuredEntityManagerFactory
to persist the model.- Line 78 specifies a simple job where we configure a single step
flatFileJpaWriterStep
which executes our reader and writer. - We provide a
RunIdIncrementer
to ensure that each execution of the job gets an unique instance. This will help Spring to differentiate multiple executions of the same job even if rest of the job parameters are same.
Running the above example ensures that records from the below person file are saved into Person
table with the fields id
, first_name
and last_name
. The Sql queries run are also visible because of the jpa configuration in application.yml
.
Person.csv
FirstName,LastName Jill,Doe Joe,Doe Justin,Doe Jane,Doe John,Doe
We specify the csv file with 5 records containing firstname and lastname for each person.
application.yml
spring: jpa: show-sql: true hibernate: ddl-auto: create
- We specify that sql queries run must be visible on the console.
- This ensures that tables are created afresh with previous data destroyed during each run of the application.
Below we can the hibernate queries running when we run our application:
4.2 JdbcCursorItemReader and StaxEventItemWriter
In this section, We will configure a JdbcCursorItemReader
to read the saved Person
model from the database in previous section and write the model to a XML file.
Jdbc and Stax Writer Configuration
@Bean
public Job flowJob() {
return jobBuilderFactory.get("flowJob")
.incrementer(new RunIdIncrementer())
.start(flatFileJpaWriterStep())
.next(jdbcStaxWriterStep())
.build();
}
private Step jdbcStaxWriterStep() {
return stepBuilderFactory.get("jdbcStaxWriterStep")
.<Person, Person>chunk(5)
.reader(jdbcCursorItemReader())
.writer(personStaxEventItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader jdbcCursorItemReader() {
JdbcCursorItemReader personJdbcCursorItemReader = new JdbcCursorItemReader<>();
personJdbcCursorItemReader.setSql("select first_name,last_name from person");
personJdbcCursorItemReader.setDataSource(dataSource);
personJdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
return personJdbcCursorItemReader;
}
@Bean
public StaxEventItemWriter personStaxEventItemWriter() {
StaxEventItemWriter staxEventItemWriter = new StaxEventItemWriter<>();
staxEventItemWriter.setResource(new FileSystemResource("src/main/resources/people.xml"));
staxEventItemWriter.setRootTagName("People");
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Person.class);
staxEventItemWriter.setMarshaller(marshaller);
return staxEventItemWriter;
}
We configure the new jdbcStaxWriterStep
as next step to previously created job.
jdbcStaxWriterStep
contains the reader asjdbcCursorItemReader
andpersonStaxEventItemWriter
to run in sequence.- We configure the
JdbcCursorItemReader
to read from autoconfigured datasource. - We provide the following sql
select first_name,last_name from person
to fetch the result from database. - We specify
BeanPropertyRowMapper
to set the values forPerson
class. - Spring Batch provides stream processing of database record to XML file.
- We specify the output resource as
src/main/resources/people.xml
- Root tag is specified as
People
and an instance ofJaxb2Marshaller
is provided - We specify
Jaxb2Marshaller
to use ourPerson
model as the class to be used for marshalling to XML.
The below XML file is the result of running the above job.
people.xml
<?xml version="1.0" encoding="UTF-8"?> <People> <Person> <FirstName>Jill</FirstName> <LastName>Doe</LastName> </Person> <Person> <FirstName>Joe</FirstName> <LastName>Doe</LastName> </Person> <Person> <FirstName>Justin</FirstName> <LastName>Doe</LastName> </Person> <Person> <FirstName>Jane</FirstName> <LastName>Doe</LastName> </Person> <Person> <FirstName>John</FirstName> <LastName>Doe</LastName> </Person> </People>
1.0 and UTF-8 are default values for version and encoding respectively. These values can be changed by using corresponding setter methods on StaxEventItemWriter
. We can verify the values from our original csv file.
4.3 StaxEventItemReader and FlatFileItemWriter
In this section, we will process the XML file and convert it into a text file delimited by :.
@Bean public Job flowJob() { return jobBuilderFactory.get("flowJob") .incrementer(new RunIdIncrementer()) .start(flatFileJpaWriterStep()) .next(jdbcStaxWriterStep()) .next(staxFileWriterStep()) .build(); } private Step staxFileWriterStep() { return stepBuilderFactory.get("staxFileWriterStep") .<Person, Person>chunk(5) .reader(personStaxEventItemReader()) .writer(flatFileItemWriter()) .build(); } @Bean public StaxEventItemReader<Person> personStaxEventItemReader() { StaxEventItemReader<Person> staxEventItemReader = new StaxEventItemReader<>(); staxEventItemReader.setResource(new FileSystemResource("src/main/resources/people.xml")); staxEventItemReader.setFragmentRootElementName("Person"); Jaxb2Marshaller unMarshaller = new Jaxb2Marshaller(); unMarshaller.setClassesToBeBound(Person.class); staxEventItemReader.setUnmarshaller(unMarshaller); return staxEventItemReader; } @Bean @StepScope public FlatFileItemWriter<Person> flatFileItemWriter() { FlatFileItemWriter<Person> flatFileItemWriter = new FlatFileItemWriter<>(); flatFileItemWriter.setShouldDeleteIfExists(true); flatFileItemWriter.setResource(new FileSystemResource("src/main/resources/modified_people.txt")); flatFileItemWriter.setLineAggregator((person) -> { return person.getFirstName() + ":" + person.getLastName(); }); return flatFileItemWriter; }
- We add
jdbcStaxWriterStep
as the next step in our flow Job. - We configure
jdbcStaxWriterStep
to use aStaxEventItemReader
andFlatFileItemWriter
with chunk size of 5. StaxEventItemReader
is configured to read from the filesrc/main/resources/people.xml
.- Person is provided as the tag element(XML) to read while Person Model is provided as the destination class for
Jaxb2Marshaller
. FlatFileItemWriter
is provided the destination location ofsrc/main/resources/modified_people.txt
.- We use lambda for
LineAggregator
to transform the person into a string asfirstName:lastName
.
Running the above configuration produces the below output:
Jill:Doe Joe:Doe Justin:Doe Jane:Doe John:Doe
This completes a tour of chaining some of the Spring Batch item readers and writers. In the next section, we will take a look under the hoods of Spring Batch Reader and Writer interface.
5. UnBoxing Item Reader and Writer
We will cover the two basic interfaces which can be extended to roll out our custom writer when the preconfigured Spring Readers and Writers do not suffice our needs.
ItemReader
has a single method read
. Reader accepts a generic type and can read item of any type. The read method should return null when there is nothing more to read from the input source. Implementations should be stateful and should take care of maintaining the state.
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
void write(List<? extends T> items) throws Exception;
Writer declares a write method which takes in items of generic type T. Spring Batch expects implementations of Reader
and Writer
to execute a step. There is also a ItemStream
interface which is combined with Reader
and Writer
to roll out ItemStreamReader/Writer
.
The main purpose of ItemStream
is to save state and restore from that state in case an error occurs. For this purpose, Item Stream utilizes three methods which must be implemented.
void open(ExecutionContext executionContext) throws ItemStreamException; void update(ExecutionContext executionContext) throws ItemStreamException; void close() throws ItemStreamException;
open
is passed the previous execution context in case of restart or fresh execution context in case of first run. update
saves the executionContext
passed in the open
method. close
is used to release all the required resources. FlatFileItemReader
is a implementation of ItemStreamReader
as it should handle restarts gracefully.
6. Summary
In this tutorial, we saw the following item Readers and Writers:
StaxEventItemReader
andWriter
FlatFileItemReader
andWriter
JdbcCursorItemReader
andJpaItemWriter
Spring Batch also has support for Ibatis, JdbcPagingItemReader and Hibernate Reader without the abstraction of JPA. We also peeked under the hoods of Reader and Writer which will help us to roll out our own Custom Writer.
Spring also provides other implementations such as:
MultiResourceItemReader
– Read from multiple resources sequentially.MultiResourceItemWriter
– Writes to a new file when the configured threshold exceeds.JMSItemReader
andWriter
– Read and write to the JMS queue.MongoItemReader
andWriter
– Read and write to MongoDB.AMQPItemReader
andWriter
– Read and write to the queues with AMQP Protocol.
7.Download the Source Code
You can download the full source code of this example here: Item Readers and Writers
Did you face this error while running it?
Caused by: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? order by JOB_INSTANCE_ID desc]; nested exception is java.sql.SQLSyntaxErrorException: user lacks privilege or object not found: BATCH_JOB_INSTANCE
Nope. But it means that hsqldb instance does not have proper permission.
YES the DB user connects to DB should have either have create table privileges to Oracle, or have the tables created upfront with necessary access to update tables. here are tables has to be created if user do not have create privs.
https://github.com/spring-projects/spring-batch/blob/master/spring-batch-core/src/main/resources/org/springframework/batch/core/schema-oracle10g.sql
how to read multiple mysql(H2) tables(some has joins) and write to csv file in spring boot batch? could you provide some examples ?