Spring Batch JmsItemReader Example
This article is a tutorial about Spring Batch with JMSItemReader
. We will use Spring Boot to speed our development process.
1. Introduction
Spring Batch is a lightweight, scale-able and comprehensive batch framework to handle data at massive scale. Spring Batch builds upon the spring framework to provide intuitive and easy configuration for executing batch applications. Spring Batch provides reusable functions essential for processing large volumes of records, including cross-cutting concerns such as logging/tracing, transaction management, job processing statistics, job restart, skip and resource management.
Spring Batch has a layered architecture consisting of three components:
- Application – Contains custom code written by developers.
- Batch Core – Classes to launch and control batch job.
- Batch Infrastructure – Reusable code for common functionalities needed by core and Application.
JMS
is a Java API which allows applications to create, send, receive, and read messages using reliable, asynchronous, loosely coupled communication. Spring provides its own implementation of JMS supporting various queue technologies. In JMS, Message Oriented Middleware(MOM)
is the glue connecting systems or peers. A peer sends the message to MOM
and MOM
has to reliably store the message until the message is read by the interested party. Also, there is the concept of topics, where MOM
has to send the message to all subscribed parties. There are many MOM
out there, out of which Apache ActiveMQ
is robust and much simpler to configure. Apache ActiveMQ is the most popular and powerful open source messaging and Integration Patterns server.
Let us dive into spring batch with a simple example of reading persons from an ActiveMQ queue and writing them out as a file. We will use an embedded database to save Spring Batch job status.
2. Technologies Used
- Java 1.8.101 (1.8.x will do fine)
- Gradle 4.4.1 (4.x will do fine)
- IntelliJ Idea (Any Java IDE would work)
- Rest will be part of the Gradle configuration.
3. Spring Batch Project (Embedded MQ)
Spring Boot Starters provides more than 30 starters to ease the dependency management for your project. The easiest way to generate a Spring Boot project is via Spring starter tool with the steps below:
- Navigate to https://start.spring.io/.
- Select Gradle Project with Java and Spring Boot version 2.0.0.
- Add Batch, JMS(Active MQ) and H2 in the “search for dependencies”.
- Enter the group name as
com.JCG
and artifact asSpringBatchJms
. - Click the Generate Project button.
A Gradle Project will be generated. If you prefer Maven, use Maven instead of Gradle before generating the project. Import the project into your Java IDE.
3.1 Gradle File
Below we can see the generated build file for our project.
build.gradle
buildscript { ext { springBootVersion = '2.0.0.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.jcg' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile('org.springframework.boot:spring-boot-starter-activemq') compile('org.springframework.boot:spring-boot-starter-batch') compile('org.apache.activemq:activemq-kahadb-store:5.8.0') compile "org.projectlombok:lombok:1.16.8" runtime("org.hsqldb:hsqldb") testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') }
- Spring Boot Version 2.0 is specified in line 3.
- Idea plugin has been applied to support Idea IDE in line 15.
- Lines 28-36 declare the dependencies needed for the project with each downloading the latest version from spring.io.
- Line 29 indicates that we are using the spring implementation of JMS for
ActiveMQ
. - Line 31 declares the dependency
kahadb
whichActiveMQ
uses to persist the queue data reliably to a file. This is essential while running an embeddedActiveMQ
instance. If it is not used, the queue gets cleared even before the reader is able to consume the messages. - Line 32 declares the dependency
Lombok
used for reducing boilerplate code.
3.2 POJO (Person)
We use a simple POJO class for reading data from MQ and writing to file. We are using Lombok
annotations to auto generate the getter, setter and constructor. The class is made serializable so that it can be transferred across the network i.e. from the queue to file.
Person(POJO) class
package com.jcg.SpringBatchJms.model; import lombok.*; import java.io.Serializable; @AllArgsConstructor @NoArgsConstructor @Getter @Setter @ToString public class Person implements Serializable { private String firstName; private String lastName; }
3.3 Spring Batch Configuration
Below we will cover the Java configuration for Spring Boot, Batch and JMS. We will discuss each part of the configuration below. We are first covering the main Spring Boot runner class below.
Application Class
package com.jcg.SpringBatchJms; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringBatchJmsApplication { public static void main(String[] args) { SpringApplication.run(SpringBatchJmsApplication.class, args); } }
- We specify our application as the spring boot application in Line 6. It takes care of all the Auto Configuration magic. Spring boot works on the philosophy of convention over configuration. It provides sensible defaults and allows overriding with the appropriate configuration.
- Line 11 starts our application with the configuration specified in below section.
Below, we will cover the Batch Configuration modelled in Java class.
Batch Configuration
package com.jcg.SpringBatchJms.config; import com.jcg.SpringBatchJms.model.Person; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.batch.item.jms.JmsItemReader; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; @EnableJms @Configuration @EnableBatchProcessing public class SpringBatchJmsConfig { public static final Logger logger = LoggerFactory.getLogger(SpringBatchJmsConfig.class.getName()); @Autowired private JmsTemplate jmsTemplate; @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public JmsListenerContainerFactory<?> queueListenerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setMessageConverter(messageConverter()); return factory; } @Bean public MessageConverter messageConverter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); converter.setTargetType(MessageType.TEXT); converter.setTypeIdPropertyName("_type"); return converter; } @Bean public JmsItemReader personJmsItemReader(MessageConverter messageConverter) { JmsItemReader personJmsItemReader = new JmsItemReader<>(); personJmsItemReader.setJmsTemplate(jmsTemplate); personJmsItemReader.setItemType(Person.class); return personJmsItemReader; } @Bean public FlatFileItemWriter personFlatFileItemWriter() { FlatFileItemWriter personFlatFileItemWriter = new FlatFileItemWriter<>(); personFlatFileItemWriter.setLineAggregator(person -> person.toString()); personFlatFileItemWriter.setLineSeparator(System.lineSeparator()); personFlatFileItemWriter.setResource(new FileSystemResource("person.txt")); return personFlatFileItemWriter; } @Bean public Job importUserJob() { return jobBuilderFactory.get("importUserJob") .incrementer(new RunIdIncrementer()) .listener(jobExecutionListener()) .flow(step1()) .end() .build(); } private Step step1() { return stepBuilderFactory.get("step1") .<Person, Person>chunk(10) .reader(personJmsItemReader(messageConverter())) .writer(personFlatFileItemWriter()) .build(); } @Bean public JobExecutionListener jobExecutionListener() { return new JobExecutionListener() { @Override public void beforeJob(JobExecution jobExecution) { Person[] people = {new Person("Jack", "Ryan"), new Person("Raymond", "Red"), new Person("Olivia", "Dunham"), new Person("Walter", "Bishop"), new Person("Harry", "Bosch")}; for (Person person : people) { logger.info(person.toString()); jmsTemplate.convertAndSend(person); } } @Override public void afterJob(JobExecution jobExecution) { } }; } }
Line 28 indicates our application is used with JMS
. Line 29 indicates that it is a configuration class and should be picked up by spring boot to wire up the beans and dependencies. Line 30 is used to enable batch support for our application. Spring defines a Job
which contains multiple Step
to be executed. In our example, we use only a single step for our importUserJob
. We use a JobExecutionListener
to send data to embedded ActiveMQ
which we will cover below. A Step
could be a TaskletStep
(contains a single function for execution) or Step
which includes a Reader
, Processor
and Writer
. In the above example, We have used Step
.
JMS
supports transferring plain strings natively without any further configuration. But in our case, we want to transfer the person object. Hence, In line 62, we have created a MessageConverter
which provides the conversion logic for a serializable object to text which we inject to JmsListenerContainerFactory
. JmsListenerContainerFactory
is the one instantiating the JMSTemplate
which is used for sending and receiving messages.
Reader: Here we are using JMSItemReader
to consume messages from the queue whose configuration we will discuss in below section. The reader is just provided the Spring boot instantiated JMSTemplate
along with the object type(Person). JMSItemReader
keeps on reading messages from the queue until there are no further messages to read from the queue.
JMSTemplate
needs to be provided with a timeout, else it will keep on waiting for messages from the queue. This is controlled via the receive-timeout
property. If this property is not provided, Spring Batch would start complaining. We provide the receive-timeout
configuration in the application.yml file.
Writer: This is a simple FlatFileItemWriter
which writes the person
object’s toString
method to a file named person.txt
. It writes items in a chunk while reader sends record by record basis.
Listener: In Listener, beforeJob
we are creating 5 person records and sending them to ActiveMQ
using JMSTemplate
. The listener afterJob
is not utilized in this example where we can do some clean-up if required.
Next, We will cover the application level configuration in yml files which will be injected into our application and spring container during startup.
application.yml
logging: pattern: console: "%msg%n" level: org.springframework: info com.jcg: info spring: activemq: broker-url: vm://localhost jms: template: default-destination: person-queue receive-timeout: 2s
- Logback configuration is specified in lines 1-6 with info mode for our package
- In Line 10, we specify the URL as localhost(embedded broker). If unspecified, Spring will create an embedded broker for you. The above syntax is helpful when you want to pass extra configuration options like
JMSRedelivery
etc. - Our
JMSTemplate
is provided a defaultActiveMQ
Destination asperson-queue
and configured to have a receive timeout of 2 seconds. If we don’t specify a receive timeout, our reader will never stop.
4. Local ActiveMQ
In this chapter, we will look at installing ActiveMQ
onto our local machine and connecting to it via our application. Navigate to the latest release and download it based on your operating System. The latest release as of now is 5.15.3 and can be downloaded from this URL. Once installed start the ActiveMQ
process. ActiveMQ
broker exposes the URL tcp://localhost:61616 for publishing and consuming messages while the console is accessed via http://127.0.0.1:8161. In Application, the following changes are applied to connect to the installed ActiveMQ.
application.yml
spring: activemq: broker-url: tcp://localhost:61616 user: admin password: admin
The broker URL is changed to localhost URL and admin credentials are provided to connect to the ActiveMQ
broker. Rest of the configuration remains intact and our application now connects to the person-queue in ActiveMQ
.
The following changes are necessary for our java configuration to observe the messages in ActiveMQ
as our 5 messages will be consumed in seconds.
@Bean public JobExecutionListener jobExecutionListener() { return new JobExecutionListener() { @Override public void beforeJob(JobExecution jobExecution) { IntStream.rangeClosed(1,300).forEach(token->{ Person[] people = {new Person("Jack", "Ryan"), new Person("Raymond", "Red"), new Person("Olivia", "Dunham"), new Person("Walter", "Bishop"), new Person("Harry", "Bosch")}; for (Person person : people) { logger.info(person.toString()); jmsTemplate.convertAndSend(person); } }); } @Override public void afterJob(JobExecution jobExecution) { } }; }
In this example, we are re-sending the same 5 messages 300 times, so that messages will remain in the queue for some time. The below steps should be followed to observe the messages in the broker console.
Navigate to http://localhost:8161/admin/ and click on Queues.
You can see the queues configured in this instance. ActiveMQ
auto creates the queue during its first invocation which is why we never created the queue in broker console. It also shows statistics of messages feed into the queue and consumed from the queue. It also shows the count of unconsumed messages in the queue. You can choose to purge the queue which will clear all the messages in the queue. There is also an option to delete the queue.
Click on person-queue to see the available messages.
It maintains only the unconsumed messages in the queue. The queue is configured as durable and will remain in the queue unless consumed or purged from the queue. We can delete a single message by clicking on the delete link.
Click on a particular message to see the details.
It displays the message along with various properties of the message. A message can be deleted, copied or even moved to another queue.
5. Summary
Run the Application
class from a Java IDE. Output similar to the below screenshot will be displayed.
We can also check the output in the generated file which is similar to below screenshot
In this example, we saw a simple way to configure a Spring Batch Application with a JMSItemReader
consuming data from embedded ActiveMQ and writing it out to a flat file. We also went ahead and installed ActiveMQ
locally and consumed messages from the queue and writing out to a flat file.
6. Download the Source Code
You can download the full source code of this example here: SpringBatchJMSItemReaderExample
reader-transactional-queue=”true”
How to use this flag?
Do we have to explicitly manage transactions?
How We can use this code for IBM MQ? Please provide link or suggestion.