Home » Enterprise Java » spring » Batch » Spring Batch Parallel Processing Example

About Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in open source distributed platforms. He currently holds masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include but not limited to machine learning and big data engineering.

Spring Batch Parallel Processing Example

This article is a tutorial about parallel processing in Spring Batch. We will use Spring Boot to speed our development process.

1. Introduction

Spring Batch is a lightweight, scale-able and comprehensive batch framework to handle data at massive scale. Spring Batch builds upon the spring framework to provide intuitive and easy configuration for executing batch applications. Spring Batch provides reusable functions essential for processing large volumes of records, including cross-cutting concerns such as logging/tracing, transaction management, job processing statistics, job restart, skip and resource management.

Spring Batch has a layered architecture consisting of three components:

  • Application – Contains custom code written by developers.
  • Batch Core – Classes to launch and control batch job.
  • Batch Infrastructure – Reusable code for common functionalities needed by core and Application.

Let us dive into parallel processing of spring batch with examples of partitioning and parallel jobs.

Want to master Spring Framework ?

Subscribe to our newsletter and download the Spring Framework Cookbook right now!

In order to help you master the leading and innovative Java framework, we have compiled a kick-ass guide with all its major features and use cases! Besides studying them online you may download the eBook in PDF format!

 

2. Technologies Used

  • Java 1.8.101 (1.8.x will do fine)
  • Gradle 4.4.1 (4.x will do fine)
  • IntelliJ Idea (Any Java IDE would work)
  • Rest will be part of the Gradle configuration.

3. Spring Batch Project

Spring Boot Starters provides more than 30 starters to ease the dependency management for your project. The easiest way to generate a Spring Boot project is via Spring starter tool with the steps below:

    • Navigate to https://start.spring.io/.
    • Select Gradle Project with Java and Spring Boot version 2.0.1.
    • Add Batch and HSqlDB in the “search for dependencies”.
    • Enter the group name as com.jcg and artifact as springBatchParallel.
    • Click the Generate Project button.

A Gradle Project will be generated. If you prefer Maven, use Maven instead of Gradle before generating the project. Import the project into your Java IDE.

3.1 Gradle File

We will look at the generated gradle file for our project. It has detailed configuration outlining the compile time and run time dependencies for our project.

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.1.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.jcg'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-batch')
    runtime('org.hsqldb:hsqldb')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.batch:spring-batch-test')
}
  • We have provided Maven as the repository for all our dependencies.
  • Idea plugin has been applied to support Idea IDE in line 15.
  • Spring Boot Batch Starter dependency is applied to enable batch nature in our project.
  • HSQL DB is provided as runtime dependency to save spring batch job status in embedded mode. Spring batch needs to track the job execution, results in a reliable manner to survive across job restarts and abnormal terminations. To ensure this, generally they are stored in the database but since our application does not use a persistent store, Hsql DB in embedded mode is utilized for the same.
  • Lines 32,33 represent the test configuration.

4. Spring Batch Parallel Processing

We will look at an example of running multiple jobs parallelly. Here, jobs are independent of each other and finish execution in a parallel manner. Below we can look at the java configuration to enable parallel processing.

Spring Batch Parallel Flow Configuration

package com.jcg.springbatchparallel.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import java.net.MalformedURLException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.IntStream;

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

    @Autowired
    JobBuilderFactory jobBuilderFactory;

    @Autowired
    StepBuilderFactory stepBuilderFactory;

    private TaskletStep taskletStep(String step) {
        return stepBuilderFactory.get(step).tasklet((contribution, chunkContext) -> {
            IntStream.range(1, 100).forEach(token -> logger.info("Step:" + step + " token:" + token));
            return RepeatStatus.FINISHED;
        }).build();

    }


    @Bean
    public Job parallelStepsJob() {

        Flow masterFlow = new FlowBuilder("masterFlow").start(taskletStep("step1")).build();


        Flow flowJob1 = new FlowBuilder("flow1").start(taskletStep("step2")).build();
        Flow flowJob2 = new FlowBuilder("flow2").start(taskletStep("step3")).build();
        Flow flowJob3 = new FlowBuilder("flow3").start(taskletStep("step4")).build();

        Flow slaveFlow = new FlowBuilder("slaveFlow")
                .split(new SimpleAsyncTaskExecutor()).add(flowJob1, flowJob2, flowJob3).build();

        return (jobBuilderFactory.get("parallelFlowJob")
                .incrementer(new RunIdIncrementer())
                .start(masterFlow)
                .next(slaveFlow)
                .build()).build();

    }

}
  • In Line 36, We have configured a simple TaskletStep. The step includes a Tasklet which iterates from numbers 1 to 100 and prints to the console. In the tasklet, we return RepeatStatus.FINISHED to indicate successful execution.
  • In Lines 56-76, We are parallelizing multiple jobs. For our example, each job is going to use the simple Tasklet we configured earlier in line 36.
  • Masterflow is configured using FlowBuilder and this holds the Tasklet configured as step1. FlowBuilder is used to construct flow of steps that can be executed as a job or part of a job. Here, we are constructing a flow as part of our example.
  • We create three different Flow with reference to taskletsteps as step2, step3 and step4.
  • A simple SlaveFlow is configured to hold all three flow jobs. We configure the SlaveFlow with a SimpleAsyncTaskExecutor that executes multiple threads parallelly. We have not defined a thread pool, so Spring will keep spawning threads to match the jobs provided. This ensures the parallel execution of jobs configured. There are multiple TaskExecutor implementations available, but AsyncTaskExecutor ensures that the tasks are executed in parallel. AsyncTaskExecutor has a concurrencyLimit property which can be used to throttle the number of threads executing parallelly.
  • We build a job which starts with masterflow and then SlaveFlow. The entire configuration creates a FlowJobBuilder from which we can create a Job.
  • Spring Batch Infrastructure will run the job when the application is started.

Let us dive ahead and run the code in our Java IDE to observe the results.

Spring Batch Sequential Step 1

 

Spring Batch Parallel Steps -> 2,3,4

  • We can observe that MasterStep has completed execution sequentially.
  • Next we see the parallelization happening between the steps Step2, Step3 and Step4.
  • All the Jobs part of SlaveFlow are running in parallel.
  • We have configured three jobs for parallel execution and dual core machines will produce the effect similar to the logs above.

Use case above is used in places where a set of jobs are dependent on an initial job for completion after which they can be completely parallelized. An initial job can be a tasklet doing minimal processing to provide a baseline while the slave jobs execute the actual logic in parallel. Spring batch waits for all the jobs in SlaveFlow to provide aggregated exit status.

5. Spring Batch Partitioning

There is another use case of parallel processing in Spring which is via partitioning. Let us consider the scenario with the example of a huge file. Multiple threads reading the same file will not ensure increased performance as the I/O resource is still one and may even lead to performance degradation. In such cases, we split a single file into multiple files and each file can be processed in the same thread. In our example, a single file person.txt containing 50 records has been split into 10 files each containing 5 records. This can be achieved by using the split command

split -l 5 person.txt person

The above command creates files with names like personaa, personab etc. We will then configure Spring Batch to process these files parallelly for faster execution. Below is the batch configuration for the same.

Spring Batch Partitioning Configuration

 @Bean
    public Job partitioningJob() throws Exception {
        return jobBuilderFactory.get("parallelJob")
                .incrementer(new RunIdIncrementer())
                .flow(masterStep())
                .end()
                .build();
    }

    @Bean
    public Step masterStep() throws Exception {
        return stepBuilderFactory.get("masterStep")
                .partitioner(slaveStep())
                .partitioner("partition", partitioner())
                .gridSize(10)
                .taskExecutor(new SimpleAsyncTaskExecutor())
                .build();
    }

    @Bean
    public Partitioner partitioner() throws Exception {
        MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        partitioner.setResources(resolver.getResources("file://persona*"));
        return partitioner;

    }

    @Bean
    public Step slaveStep() throws Exception {
        return stepBuilderFactory.get("slaveStep")
                .<Map<String, String>, Map<String, String>>chunk(1)
                .reader(reader(null))
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Map<String, String>> reader(@Value("#{stepExecutionContext['fileName']}") String file) throws MalformedURLException {
        FlatFileItemReader<Map<String, String>> reader = new FlatFileItemReader<>();
        reader.setResource(new UrlResource(file));

        DefaultLineMapper<Map<String, String>> lineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(":");
        tokenizer.setNames("key", "value");

        lineMapper.setFieldSetMapper((fieldSet) -> {
            Map<String, String> map = new LinkedHashMap<>();
            map.put(fieldSet.readString("key"), fieldSet.readString("value"));
            return map;
        });
        lineMapper.setLineTokenizer(tokenizer);
        reader.setLineMapper(lineMapper);

        return reader;
    }

    @Bean
    public ItemWriter<Map<String, String>> writer() {
        return (items) -> items.forEach(item -> {
            item.entrySet().forEach(entry -> {
                logger.info("key->[" + entry.getKey() + "] Value ->[" + entry.getValue() + "]");
            });
        });
    }
  • We are creating a Job parallelJob with a single Step masterStep.
  • MasterStep has two Partitioners – one provides the data as partitions, while another handles the partitioned data.
  • MultiResourcePartitioner is used to provide the partitioned data. It looks for files in the current directory starting with persona and returns each file as a separate partition.
  • Each partition contains a StepExecutionContext with filename stored in the key fileName.
  • gridSize is used to specify an estimate for the number of partitions to be created but number of partitions can exceed gridSize also.
  • Each partition is then fed into slaveStep which has a reader and writer.
  • chunkSizeis provided as 1 to ensure writer gets called after every record is read. Ideally, it would be better to specify a larger number as chunk of records will get processed on each pass.
  • In our example, we have used FlatFileReader with the filename provided by the Partitioner. Our file is split by : which has just a key and value. Each line is read and fed to our custom lineMapper written inline as lambda function. LineMapper transforms the read tokens into a Map with key and value.
  • This chunk of lines is fed into our custom writer which is another anonymous class implementation. Our custom writer iterates through the chunk of maps fed into it and logs out the key/value pair.
  • We have specified the executor as AsyncTaskExecutor which starts creating number of threads equal to number of partitions. If the number of threads exceed OS cores, context switching will happen and there will be concurrency.

Below is the output of running the parallelJob.

Spring Batch parallel job

We can clearly see the task executor context switching as each task executor concurrently logs people names.

6. Summary

In this example, we have demonstrated parallel processing features of Spring Batch. We saw two approaches to parallel processing with Spring Batch. Partitioning has seen widespread use in many of the applications. The former is parallelizing multiple jobs, while partitioning is parallelizing a single job. Both have its own use in applications.

7. Download the Source Code

Download
You can download the full source code of this example here: Spring Batch Parallel Processing
(No Ratings Yet)
4 Comments Views Tweet it!

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

4
Leave a Reply

avatar
2 Comment threads
2 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
3 Comment authors
Rajagopal ParthaSarathiNacisyadav Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
yadav
Guest
yadav

if i want transaction management for this. like parallel there are 2 steps are running and if one fails then the transaction committed in the other should get roll back.

Please help how to implement transaction management in this.

Nacis
Guest
Nacis

Hi, thanks for this helpfull tutorial. Please i want to know how to Launch this application.