Batch

Spring Batch ItemReaders and ItemWriters Example

This article is a tutorial about the various Item Readers and Item Writers in Spring Batch. We will use Spring Boot to speed our development process.
 
 
 
 
 
 
 
 
 
 

1. Introduction

Spring Batch is a lightweight, scale-able and comprehensive batch framework to handle data at massive scale. It builds upon the spring framework to provide intuitive and easy configuration for executing batch applications. It provides reusable functions essential for processing large volumes of records, including cross-cutting concerns such as logging/tracing, transaction management, job processing statistics, job restart, skip and resource management.

Spring Batch has a layered architecture consisting of three components:

  • Application – Contains custom code written by developers.
  • Batch Core – Classes to launch and control batch job.
  • Batch Infrastructure – Reusable code for common functionalities needed by core and Application.

Let us dive into parallel processing of spring batch with examples of item readers and item writers.

2. Technologies Used

  • Java 1.8.101 (1.8.x will do fine)
  • Gradle 4.4.1 (4.x will do fine)
  • IntelliJ Idea (Any Java IDE would work)
  • Rest will be part of the Gradle configuration.

3. Spring Batch Project

Spring Boot Starters provides more than 30 starters to ease the dependency management for your project. The easiest way to generate a Spring Boot project is via Spring starter tool with the steps below:

    • Navigate to https://start.spring.io/.
    • Select Gradle Project with Java and Spring Boot version 2.0.1.
    • Add Batch and HSqlDB in the “search for dependencies”.
    • Enter the group name as com.jcg and artifact as sprbatch.
    • Click the Generate Project button.

A Gradle Project will be generated. If you prefer Maven, use Maven instead of Gradle before generating the project. Import the project into your Java IDE.

3.1 Gradle File

We will look at the generated gradle file for our project. It has detailed configuration outlining the compile time and run time dependencies for our project.

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.1.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.jcg'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-batch')
    compile('org.springframework.boot:spring-boot-starter-data-jpa')
    compile('org.springframework.boot:spring-boot-starter-jdbc')
    compile('org.mybatis.spring.boot:mybatis-spring-boot-starter:1.3.2')
    runtime('org.hsqldb:hsqldb')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.batch:spring-batch-test')
}
  • We have provided Maven as the repository for all our dependencies.
  • Spring Boot Batch Starter dependency is applied to enable batch nature in our project.
  • HSQL DB is provided as runtime dependency to save spring batch job status in embedded mode. Spring batch needs to track the job execution, results in a reliable manner to survive across job restarts and abnormal terminations. To ensure this, generally they are stored in the database  and we use Hsql DB in embedded mode for the same.
  • HSQL DB is going to be used for demonstrating our database readers and writers
  • Lines 33,34 represent the test configuration.

Below is the Person model which will be used as data structure in our application.

Person

package com.jcg.sprbatch.model;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;

@Entity
@XmlRootElement(name = "Person")
public class Person {
    @Override
    public String toString() {
        return "Person{" +
                "lastName='" + lastName + '\'' +
                ", firstName='" + firstName + '\'' +
                '}';
    }

    @Id
    @GeneratedValue
    private int id;

    @XmlElement(name = "LastName")
    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @XmlElement(name = "FirstName")
    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    private String lastName;

    private String firstName;

}
  • Line 10 specifies the class Person is a JPA Entity while Line 11 specifies that it is Root Element of the xml.
  • Id value is primary key and auto generated by hibernate which is specified in lines 21-22.
  • Line 25 and 34 specify that FirstName and LastName are to be used as Xml tags when marshalling and unmarshalling by the Object Xml mapper.

We will cover couple of readers in combination with another variant of writer in detailed manner with examples in the next section.

4. Item Readers and Writers

4.1 FlatFileItemReader and JpaItemWriter

In this example, We will read from a csv file and write the records into our Hsql database. Below is the batch configuration to achieve the same.

FlatFile and JPA Configuration

package com.jcg.sprbatch.config;

import com.jcg.sprbatch.model.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;

import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
public class SpringBatchConfig {

    @Autowired
    EntityManagerFactory emf;

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    @Autowired
    DataSource dataSource;


    @Bean
    @StepScope
    public FlatFileItemReader<Person> flatFileItemReader() {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("people.csv"));
        reader.setLinesToSkip(1);

        DefaultLineMapper<Person> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames("firstName", "lastName");

        BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
        fieldSetMapper.setTargetType(Person.class);

        lineMapper.setFieldSetMapper(fieldSetMapper);
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);

        return reader;
    }

    @Bean
    public JpaItemWriter<Person> jpaItemWriter() {
        JpaItemWriter<Person> writer = new JpaItemWriter();
        writer.setEntityManagerFactory(emf);
        return writer;
    }


    @Bean
    public Job flowJob() {
        return jobBuilderFactory.get("flowJob")
                .incrementer(new RunIdIncrementer())
                .start(flatFileJpaWriterStep())
                .build();
    }
}
  • Lines 31-32 specify that this class is a configuration class and we enable batch processing for this project.
  • In line 50-67, we configure a FlatFileItemReader as follows.
  • We provide the filename as people.csvand mention that it is available in our classpath.
  • In Line 53, we specify that the first line has to be skipped as the first line is header.
  • We split line by comma, as it is a Comma Separated file.
  • Tokenizer is utilized to split the line into tokens based on our delimiter.
  • We map each of the tokens into a model class based on BeanWrapperFieldSetMapper.
  • Line 70 specifies JpaItemWriter which persists the person model into database.
  • JpaItemWriter uses the auto configured EntityManagerFactory to persist the model.
  • Line 78 specifies a simple job where we configure a single step flatFileJpaWriterStep which executes our reader and writer.
  • We provide a RunIdIncrementer to ensure that each execution of the job gets an unique instance. This will help Spring to differentiate multiple executions of the same job even if rest of the job parameters are same.

Running the above example ensures that records from the below person file are saved into Person table with the fields id, first_name and last_name. The Sql queries run are also visible because of the jpa configuration in application.yml.

Person.csv

FirstName,LastName
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

We specify the csv file with 5 records containing firstname and lastname for each person.

application.yml

spring:
  jpa:
    show-sql: true
    hibernate:
      ddl-auto: create
  • We specify that sql queries run must be visible on the console.
  • This ensures that tables are created afresh with previous data destroyed during each run of the application.

Below we can the hibernate queries running when we run our application:

JPA Queries

4.2 JdbcCursorItemReader and StaxEventItemWriter

In this section, We will configure a JdbcCursorItemReader to read the saved Person model from the database in previous section and write the model to a XML file.

Jdbc and Stax Writer Configuration

@Bean
    public Job flowJob() {
        return jobBuilderFactory.get("flowJob")
                .incrementer(new RunIdIncrementer())
                .start(flatFileJpaWriterStep())
                .next(jdbcStaxWriterStep())
                .build();
    }
    private Step jdbcStaxWriterStep() {
        return stepBuilderFactory.get("jdbcStaxWriterStep")
                .<Person, Person>chunk(5)
                .reader(jdbcCursorItemReader())
                .writer(personStaxEventItemWriter())
                .build();
    }
    @Bean
    public JdbcCursorItemReader jdbcCursorItemReader() {
        JdbcCursorItemReader personJdbcCursorItemReader = new JdbcCursorItemReader<>();
        personJdbcCursorItemReader.setSql("select first_name,last_name from person");
        personJdbcCursorItemReader.setDataSource(dataSource);
        personJdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
        return personJdbcCursorItemReader;
    }
    @Bean
    public StaxEventItemWriter personStaxEventItemWriter() {
        StaxEventItemWriter staxEventItemWriter = new StaxEventItemWriter<>();
        staxEventItemWriter.setResource(new FileSystemResource("src/main/resources/people.xml"));
        staxEventItemWriter.setRootTagName("People");
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(Person.class);
        staxEventItemWriter.setMarshaller(marshaller);
        return staxEventItemWriter;
    }
We configure the new jdbcStaxWriterStep as next step to previously created job.
  • jdbcStaxWriterStep contains the reader as jdbcCursorItemReader and personStaxEventItemWriter to run in sequence.
  • We configure the JdbcCursorItemReader to read from autoconfigured datasource.
  • We provide the following sql select first_name,last_name from person to fetch the result from database.
  • We specify BeanPropertyRowMapper to set the values for Person class.
  • Spring Batch provides stream processing of database record to XML file.
  • We specify the output resource as src/main/resources/people.xml
  • Root tag is specified as People and an instance of Jaxb2Marshaller is provided
  • We specify Jaxb2Marshaller to use our Person model as the class to be used for marshalling to XML.

The below XML file is the result of running the above job.

people.xml

<?xml version="1.0" encoding="UTF-8"?>
<People>
 <Person>
 <FirstName>Jill</FirstName>
 <LastName>Doe</LastName>
 </Person>
 <Person>
 <FirstName>Joe</FirstName>
 <LastName>Doe</LastName>
 </Person>
 <Person>
 <FirstName>Justin</FirstName>
 <LastName>Doe</LastName>
 </Person>
 <Person>
 <FirstName>Jane</FirstName>
 <LastName>Doe</LastName>
 </Person>
 <Person>
 <FirstName>John</FirstName>
 <LastName>Doe</LastName>
 </Person>
</People>

1.0 and UTF-8 are default values for version and encoding respectively. These values can be changed by using corresponding setter methods on StaxEventItemWriter. We can verify the values from our original csv file.

4.3 StaxEventItemReader and FlatFileItemWriter

In this section, we will process the XML file and convert it into a text file delimited by :.

@Bean
public Job flowJob() {
    return jobBuilderFactory.get("flowJob")
            .incrementer(new RunIdIncrementer())
            .start(flatFileJpaWriterStep())
            .next(jdbcStaxWriterStep())
            .next(staxFileWriterStep())
            .build();
}
private Step staxFileWriterStep() {
 return stepBuilderFactory.get("staxFileWriterStep")
 .<Person, Person>chunk(5)
 .reader(personStaxEventItemReader())
 .writer(flatFileItemWriter())
 .build();
}
@Bean
public StaxEventItemReader<Person> personStaxEventItemReader() {
 StaxEventItemReader<Person> staxEventItemReader = new StaxEventItemReader<>();
 staxEventItemReader.setResource(new FileSystemResource("src/main/resources/people.xml"));
 staxEventItemReader.setFragmentRootElementName("Person");
 Jaxb2Marshaller unMarshaller = new Jaxb2Marshaller();
 unMarshaller.setClassesToBeBound(Person.class);
 staxEventItemReader.setUnmarshaller(unMarshaller);
 return staxEventItemReader;
}
@Bean
@StepScope
public FlatFileItemWriter<Person> flatFileItemWriter() {
 FlatFileItemWriter<Person> flatFileItemWriter = new FlatFileItemWriter<>();
 flatFileItemWriter.setShouldDeleteIfExists(true);
 flatFileItemWriter.setResource(new FileSystemResource("src/main/resources/modified_people.txt"));
 flatFileItemWriter.setLineAggregator((person) -> {
 return person.getFirstName() + ":" + person.getLastName();
 });
 return flatFileItemWriter;
}
  • We add jdbcStaxWriterStep as the next step in our flow Job.
  • We configure jdbcStaxWriterStep to use a StaxEventItemReader and FlatFileItemWriter with chunk size of 5.
  • StaxEventItemReader is configured to read from the file src/main/resources/people.xml.
  • Person is provided as the tag element(XML) to read while Person Model is provided as the destination class for Jaxb2Marshaller.
  • FlatFileItemWriter is provided the destination location of src/main/resources/modified_people.txt.
  • We use lambda for LineAggregator to transform the person into a string as firstName:lastName.

Running the above configuration produces the below output:

Jill:Doe
Joe:Doe
Justin:Doe
Jane:Doe
John:Doe

This completes a tour of chaining some of the Spring Batch item readers and writers. In the next section, we will take a look under the hoods of Spring Batch Reader and Writer interface.

5. UnBoxing Item Reader and Writer

We will cover the two basic interfaces which can be extended to roll out our custom writer when the preconfigured Spring Readers and Writers do not suffice our needs.

ItemReader has a single method read. Reader accepts a generic type and can read item of any type. The read method should return null when there is nothing more to read from the input source. Implementations should be stateful and should take care of maintaining the state.

T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
void write(List<? extends T> items) throws Exception;

Writer declares a write method which takes in items of generic type T. Spring Batch expects implementations of Reader and Writer to execute a step. There is also a ItemStream interface which is combined with Reader and Writer to roll out ItemStreamReader/Writer.

The main purpose of ItemStream is to save state and restore from that state in case an error occurs. For this purpose, Item Stream utilizes three methods which must be implemented.

void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;

open is passed the previous execution context in case of restart or fresh execution context in case of first run. update saves the executionContext passed in the open method. close is used to release all the required resources. FlatFileItemReader is a implementation of ItemStreamReader as it should handle restarts gracefully.

6. Summary

In this tutorial, we saw the following item Readers and Writers:

  • StaxEventItemReader and Writer
  • FlatFileItemReader and Writer
  • JdbcCursorItemReader and JpaItemWriter

Spring Batch also has support for Ibatis, JdbcPagingItemReader and Hibernate Reader without the abstraction of JPA. We also peeked under the hoods of Reader and Writer which will help us to roll out our own Custom Writer.

Spring also provides other implementations such as:

  • MultiResourceItemReader – Read from multiple resources sequentially.
  • MultiResourceItemWriter – Writes to a new file when the configured threshold exceeds.
  • JMSItemReader and Writer – Read and write to the JMS queue.
  • MongoItemReader and Writer – Read and write to MongoDB.
  • AMQPItemReader and Writer – Read and write to the queues with AMQP Protocol.

7.Download the Source Code

Download
You can download the full source code of this example here: Item Readers and Writers

Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in distributed platforms. He holds a masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include data science and distributed computing.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

4 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Praveen
Praveen
5 years ago

Did you face this error while running it?
Caused by: org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, JOB_NAME from BATCH_JOB_INSTANCE where JOB_NAME = ? order by JOB_INSTANCE_ID desc]; nested exception is java.sql.SQLSyntaxErrorException: user lacks privilege or object not found: BATCH_JOB_INSTANCE

siv
siv
4 years ago
Reply to  Praveen

YES the DB user connects to DB should have either have create table privileges to Oracle, or have the tables created upfront with necessary access to update tables. here are tables has to be created if user do not have create privs.

https://github.com/spring-projects/spring-batch/blob/master/spring-batch-core/src/main/resources/org/springframework/batch/core/schema-oracle10g.sql

Dul
Dul
4 years ago

how to read multiple mysql(H2) tables(some has joins) and write to csv file in spring boot batch? could you provide some examples ?

Back to top button