Spring Batch Partitioning Example

Most often, batch processing using Spring Batch is carried out in a single-threaded, single process operation. But sometimes, one may need to have other complex implementations to facilitate parallel processing. In this article, we will discuss one such approach that is by Partitioning a Step. It will be driven by an XML based example code. Tools used in the example are as follows:

The article has been organized into the following sections. Feel free to skip to any section of choice. The example code is available for download at the end of the article.

1. Introduction

Spring Batch is an open-source framework for batch processing. Most of the batch processing can be handled with single-threaded, single process jobs but sometimes, having the processing run in parallel becomes necessary; say when the processing takes a really long time. At a high level, Spring Batch supports two modes of parallel processing: single process, multi-threaded and multi-process. Following are the types into which it can be further categorized:

<job id="myJob">
    <step id="step1" next="step2"/>
    <step id="step2" />   
</job>

2.2 Step

As specified in the documentation, a Step is a domain object that represents an independent, sequential phase of a Job. One or more Steps constitute a Job. A Step is where the actual batch processing happens. It can be configured to do some complex processing like reading from a source, applying business logic to the read data and then writing out the data to some output say a file or a database table etc. Or one can simply invoke a remote procedure call or a stored procedure in a database in a Step. In this article, we will be focused on Partitioning a Step so that the batch processing happens in parallel. The following is an example snippet of configuring a Step. Notice that we have two Steps configured in this Job, one does a chunk-processing that uses a reader and a writer with a commit-interval of 10 implying that it would read and write data in chunks of 10 records. The second Step invokes a Tasklet that could be doing a simpler action of say, invoking a stored procedure etc.

XML Step Configuration

<job id="myJob" xmlns="http://www.springframework.org/schema/batch">
	<step id="processingStep" next="taskletStep">
	    <tasklet>
		<chunk reader="cvsFileItemReader" writer="myItemWriter" commit-interval="10">
		</chunk>
	     </tasklet>
        </step>
	<step id="taskletStep">
		<tasklet ref="myTasklet" />
	</step>
</job>

2.3 Parallel Processing

Most often single-threaded, single batch processing is good enough to serve one’s task at hand as reading and writing files of a few hundred megabytes generally takes just a couple of minutes even with standard hardware. But if one is looking for more, the following approaches for parallel processing in batch operations can be explored.

2.3.1 Multi-Threaded Step

It is a part of a single process but the execution happens in multiple threads. One of the easiest ways of achieving it is by supplying a multi-threaded task-executor as shown in the code snippet below.

Multi-Threaded Step

<step id="loadingData"> 
    <tasklet task-executor="taskExecutor" throttle-limit="20">...
    </tasklet>
</step>
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

SimpleAsyncTaskExecutor is the simplest multi-threaded implementation of the TaskExecutor interface. Using it implies that processing defined under the <tasklet/> tags of the Step for each chunk of data—reading, processing and writing, happens in different threads. Hence, the processed records may not be in the same sequential order as fed into it. There are following three things to take note of.

2.3.2 Parallel Steps

This is pretty easy to configure and use. It can be employed only when the processing logic can be split into distinct responsibilities and assigned to individual flows to be run in parallel in a single step. It is done using the ‘split’ element that contains one or more ‘flows’. The following shows an example snippet of configuring it.

Parallel Step

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" next="step2"/>
            <step id="step2" />
        </flow>
        <flow>
            <step id="step3"/>
        </flow>
    </split>
    <step id="step4"/>
</job>
<bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

Points to note are as follows:

2.3.3 Remote-Chunking a Step

This follows a master-slave model in which Step processing is split across multiple processes that communicate with each other through some middleware as illustrated in the diagram below.

Fig.1 Remote Chunking (courtesy Spring Batch Documentation)

Points to note on this approach are as follows:

2.3.4 Partitioning a Step

The beauty of Step Partitioning is that it allows a Step to be partitioned or split into a number of child steps which can then participate in the processing either as remote instances or as local execution threads. The child steps are all instances of the same Step and could replace the Master step without any difference in output. The following diagram nicely conveys the concept.

Fig.2 Step Partitioning (courtesy Spring Batch Documentation)

Following are a few points to note on Step partitioning.

3. Step Partitioning Example

Now that we are good with the basics, we can embark on the adventure of understanding Spring Batch Step Partitioning with the following XML based example code. Since the target is to grasp the concept, we will stress more on getting the concepts clarified rather than accomplishing something too challenging through Step Partitioning. In fact, in this example we intend to read some data in an in-memory database table, namely HSQL database and write the data to CSV files through Partitioned Steps. The roadmap is as follows.

So let’s begin. Have deliberately kept the example simple and added comments where it was thought would be necessary.

3.1 Project Set-Up

3.2 Maven Dependencies

In the pom.xml file add the following Maven dependencies.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.javacodegeeks.example</groupId>
  <artifactId>SpringBatchStepPartitioning</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <properties>
  	<spring.version>4.0.5.RELEASE</spring.version>
  	<spring.batch.version>3.0.4.RELEASE</spring.batch.version>
  	<spring.jdbc.version>4.0.5.RELEASE</spring.jdbc.version>
  	<hsql.version>1.8.0.7</hsql.version>
  	<commons.version>1.4</commons.version>
  </properties>
  <dependencies>
  	 <dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context-support</artifactId>
		<version>${spring.version}</version>
    </dependency>
  	<dependency>
		<groupId>org.springframework.batch</groupId>
		<artifactId>spring-batch-core</artifactId>
		<version>${spring.batch.version}</version>
    </dependency>   
    <dependency>
		<groupId>hsqldb</groupId>
		<artifactId>hsqldb</artifactId>
		<version>${hsql.version}</version>
	</dependency>   
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jdbc</artifactId>
		<version>${spring.jdbc.version}</version>		
   </dependency>
     <dependency>
		<groupId>commons-dbcp</groupId>
		<artifactId>commons-dbcp</artifactId>
		<version>${commons.version}</version>
	</dependency>
  </dependencies>
</project>

3.3 HSQL Table Creation

In the initial-query.sql add the following code. This script will be invoked on the dataSource that is, HSQL database in our case while we configure the context. It simply drops and creates a table PERSON and inserts a chunk of records into it. These records would then be read in the Partitioned Steps. We are inserting about 20 records which we intend to be handled in say, 4 or 5 Slave Steps. Note how we are using an auto-incremented column “id” to be used for indexing. This would be used to decide which of the records would be handled by which Slave Step. Of course, one could have employed any other approach for splitting up the data say, using separate CSV files etc.

initial-query.sql

DROP TABLE IF EXISTS PERSON;
CREATE TABLE PERSON(
id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, 
firstName varchar(20),
lastName varchar(20),
city varchar(30)
);
INSERT INTO PERSON(firstName, lastName, city) VALUES('Allan','Mohammed','Bangalore');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Ashok','Lee','Bangalore');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Raju','Diesel','Delhi');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Ramesh','Chan','NYC');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Howard','Das','California');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Daniel','Gupta','Delhi');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Alexander','Speilberg','New Jersey');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Danny','Rowling','California');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Elizabeth','Lee','Kolkatta');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Thomas','Ali','Delhi');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Billy','Owens','Los Angeles');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Joe','Bush','Atlanta');
INSERT INTO PERSON(firstName, lastName, city) VALUES('John','Chand','San Francisco');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Sultan','Mohammed','NYC');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Harper','Khan','Hyderabad');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Charlie','Chan','Bangalore');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Jake','Beans','Delhi');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Artemis','Harper','Nabraska');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Judith','Reddy','New Jersey');
INSERT INTO PERSON(firstName, lastName, city) VALUES('Elvin','Saring','NYC');

3.4 POJOs

Add the following POJO classes (refer project structure above). These are just meant to map the data to the database table.

Person.java

package com.javacodegeeks.example.util;
public class Person {
   String firstName, lastName, city;
   int id;
public String getFirstName() {
	return firstName;
}
public void setFirstName(String firstName) {
	this.firstName = firstName;
}
public String getLastName() {
	return lastName;
}
public void setLastName(String lastName) {
	this.lastName = lastName;
}
public String getCity() {
	return city;
}
public void setCity(String city) {
	this.city = city;
}
public int getId() {
	return id;
}
public void setId(int id) {
	this.id = id;
 }   
}

PersonMapper.java

package com.javacodegeeks.example.util;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
public class PersonMapper implements RowMapper{
	public Person mapRow(ResultSet rs, int rowNum) throws SQLException {
		Person person = new Person();
		person.setFirstName(rs.getString("firstName"));
		person.setLastName(rs.getString("lastName"));
		person.setCity(rs.getString("city"));
		person.setId(rs.getInt("id"));
		return person;
	}
}

3.5 Partitioner Implementation

Now we will write a simple implementation of the Partitioner interface. We want to generate ExecutionContexts for the Slave Steps. There are following things of note here.

MyPartitioner.java

package com.javacodegeeks.example.util;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class MyPartitioner implements Partitioner{
	static private Log log = LogFactory.getLog(MyPartitioner.class);
	
	public Map partition(int gridSize) {
		log.debug("START: Partition");
		Map partitionMap = new HashMap();
		int startingIndex = 0;
		int endingIndex = 5;
		
		for(int i=0; i< gridSize; i++){
			ExecutionContext ctxMap = new ExecutionContext();
			ctxMap.putInt("startingIndex",startingIndex);
			ctxMap.putInt("endingIndex", endingIndex);
						
			startingIndex = endingIndex+1;
			endingIndex += 5; 
			
			partitionMap.put("Thread:-"+i, ctxMap);
		}
		log.debug("END: Created Partitions of size: "+ partitionMap.size());
		return partitionMap;
	}
}

3.6 Context Configuration

Okay, we are closing in now. In the context-config.xml file add the following code. Here we are setting up the jobRepository,jobLauncher ,transactionManager, dataSource and invoking some scripts that create the batch meta-data tables and also our initial-query.sql written above.

context-config.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc"	
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
		http://www.springframework.org/schema/jdbc 
	    http://www.springframework.org/schema/jdbc/spring-jdbc.xsd">
	<!-- using in-memory store for batch meta-data --> 
	<bean id="jobRepository"
		class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
		<property name="transactionManager" ref="transactionManager" />
	</bean>
 	
	<bean id="transactionManager"
		class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
	 
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
	
	<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
		lazy-init="true" destroy-method="close">
		<property name="driverClassName" value="org.hsqldb.jdbcDriver" />
		<property name="url"
			value="jdbc:hsqldb:file:src/main/resources/hsqldb/batchcore.db;shutdown=true;" />		
		<property name="username" value="sa" />
		<property name="password" value="" />
	</bean>
	
	<!-- Create meta-tables. Here we are also invoking the initial-query.sql script -->
	<jdbc:initialize-database data-source="dataSource">
	    <jdbc:script location="classpath:hsqldb/initial-query.sql" />
		<jdbc:script location="org/springframework/batch/core/schema-drop-hsqldb.sql" />
		<jdbc:script location="org/springframework/batch/core/schema-hsqldb.sql" />
    </jdbc:initialize-database>
</beans>

3.7 Job Configuration

Next we are going to configure our Job. As outlined above it is a Job with one Master Step and 4 Slave Steps as denoted by the ‘gridSize’ attribute. We will be using the MyPartitioner.java that we wrote and the framework provided SimpleAsyncTaskExecutor. For reading the data from the table JDBCPagingItemReader and for writing the output to CSV files FlatFileItemWriter will be used. It is pretty self-explanatory. But still would like to highlight the following.

job-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:util="http://www.springframework.org/schema/util"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch 
		http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
		http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
		http://www.springframework.org/schema/util 
		http://www.springframework.org/schema/util/spring-util-3.2.xsd
		">
    <!-- Configuring a Job with one master step and 4 (denoted by grid-size) slave threads -->
	<job id="myJob" xmlns="http://www.springframework.org/schema/batch">
		<step id="master.Step">
			<partition step="slave.Step" partitioner="myPartitioner">
				<handler grid-size="4" task-executor="myTaskExecutor"/>
			</partition>
		</step>
	</job>
	
	<step id="slave.Step" xmlns="http://www.springframework.org/schema/batch">
		<tasklet>
			<chunk reader="pagingItemReader" writer="flatFileItemWriter" commit-interval="1" />
		</tasklet>
	</step> 
	
	<bean id="myTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
	
	<bean id="myPartitioner" class="com.javacodegeeks.example.util.MyPartitioner" />
	
	<bean id="pagingItemReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">
		<property name="dataSource" ref="dataSource" />
		<property name="queryProvider">
			<bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
				<property name="dataSource" ref="dataSource"/>
				<property name="selectClause" value="select firstName, lastName, city, id "/>
				<property name="fromClause" value="from PERSON" />
				<property name="whereClause" value="where id >= :startingIndex and id <= :endingIndex" />
				<property name="sortKey" value="id"/>
			</bean>
		</property>
		<property name="parameterValues">
			<map>
				<entry key="startingIndex" value="#{stepExecutionContext[startingIndex]}"/>
				<entry key="endingIndex" value="#{stepExecutionContext[endingIndex]}" />
			</map>
		</property>
		<property name="pageSize" value="5"/>
		<property name="rowMapper">
			<bean class="com.javacodegeeks.example.util.PersonMapper"/>
		</property>
	</bean>
	
	<bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
		<property name="resource" value="file:csv/output/person.data#{stepExecutionContext[startingIndex]}-#{stepExecutionContext[endingIndex]}.csv"/>
		<property name="appendAllowed" value="false"/>
		<property name="lineAggregator">
			<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
				<property name="delimiter" value=","/>
				<property name="fieldExtractor">
					<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
						<property name="names" value="firstName, lastName, city" />
					</bean>
				</property>
			</bean>
		</property>
	</bean>
		
</beans>

3.8 Application Set-Up

Now that we are done with the set-up, let’s write our Main method that simply picks up the Job and launches it.

Main.java

package com.javacodegeeks.example.main;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
	public static void main(String[] args) {
		String[] str = {"META-INF/spring/context-config.xml","META-INF/spring/job-config.xml"};
		ApplicationContext ctx = new ClassPathXmlApplicationContext(str);
		Job job = (Job) ctx.getBean("myJob");
		JobLauncher jobLauncher = (JobLauncher) ctx.getBean("jobLauncher");
		try{
			JobExecution execution = jobLauncher.run(job, new JobParameters());
			System.out.println("Execution Status: "+ execution.getStatus());
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}

3.9 Output

Okay, so if everything has been followed along fine so far one should be able to run it as a Java Application upon which it should display the following output. Also, upon refreshing the project in the Eclipse workspace one should see the CSV files written into it.

Console Output

Aug 5, 2015 12:23:19 PM org.springframework.jdbc.datasource.init.ScriptUtils executeSqlScript
INFO: Executing SQL script from class path resource [hsqldb/initial-query.sql]
Aug 5, 2015 12:23:19 PM org.springframework.jdbc.datasource.init.ScriptUtils executeSqlScript
INFO: Executed SQL script from class path resource [hsqldb/initial-query.sql] in 23 ms.
Aug 5, 2015 12:23:19 PM org.springframework.jdbc.datasource.init.ScriptUtils executeSqlScript
INFO: Executing SQL script from class path resource [org/springframework/batch/core/schema-drop-hsqldb.sql]
Aug 5, 2015 12:23:19 PM org.springframework.jdbc.datasource.init.ScriptUtils executeSqlScript
INFO: Executed SQL script from class path resource [org/springframework/batch/core/schema-drop-hsqldb.sql] in 3 ms.
Aug 5, 2015 12:23:19 PM org.springframework.jdbc.datasource.init.ScriptUtils executeSqlScript
INFO: Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
Aug 5, 2015 12:23:19 PM org.springframework.jdbc.datasource.init.ScriptUtils executeSqlScript
INFO: Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 6 ms.
Aug 5, 2015 12:23:19 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=myJob]] launched with the following parameters: [{}]
Aug 5, 2015 12:23:19 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [master.Step]
Execution Status: COMPLETED
Aug 5, 2015 12:23:20 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=myJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

CSV Files AS Output

Fig.8 Project after Running the Application

4. Conclusion

This brings us to the end of the example. Hope it was an interesting read. As promised, the full example code is available for download below.

Download
You can download the full source code of this example here: SpringBatchStepPartitioning
Exit mobile version