Batch

Spring Batch JmsItemReader Example

This article is a tutorial about Spring Batch with JMSItemReader. We will use Spring Boot to speed our development process.

1. Introduction

Spring Batch is a lightweight, scale-able and comprehensive batch framework to handle data at massive scale. Spring Batch builds upon the spring framework to provide intuitive and easy configuration for executing batch applications. Spring Batch provides reusable functions essential for processing large volumes of records, including cross-cutting concerns such as logging/tracing, transaction management, job processing statistics, job restart, skip and resource management.

Spring Batch has a layered architecture consisting of three components:

  • Application – Contains custom code written by developers.
  • Batch Core – Classes to launch and control batch job.
  • Batch Infrastructure – Reusable code for common functionalities needed by core and Application.

 
JMS is a Java API which allows applications to create, send, receive, and read messages using reliable, asynchronous, loosely coupled communication. Spring provides its own implementation of JMS supporting various queue technologies. In JMS, Message Oriented Middleware(MOM) is the glue connecting systems or peers. A peer sends the message to MOM and MOM has to reliably store the message until the message is read by the interested party. Also, there is the concept of topics, where MOM has to send the message to all subscribed parties. There are many MOM out there, out of which Apache ActiveMQ is robust and much simpler to configure. Apache ActiveMQ is the most popular and powerful open source messaging and Integration Patterns server.

Let us dive into spring batch with a simple example of reading persons from an ActiveMQ queue and writing them out as a file. We will use an embedded database to save Spring Batch job status.

2. Technologies Used

  • Java 1.8.101 (1.8.x will do fine)
  • Gradle 4.4.1 (4.x will do fine)
  • IntelliJ Idea (Any Java IDE would work)
  • Rest will be part of the Gradle configuration.

3. Spring Batch Project (Embedded MQ)

Spring Boot Starters provides more than 30 starters to ease the dependency management for your project. The easiest way to generate a Spring Boot project is via Spring starter tool with the steps below:

  • Navigate to https://start.spring.io/.
  • Select Gradle Project with Java and Spring Boot version 2.0.0.
  • Add Batch, JMS(Active MQ) and H2 in the “search for dependencies”.
  • Enter the group name as com.JCG and artifact as SpringBatchJms.
  • Click the Generate Project button.

A Gradle Project will be generated. If you prefer Maven, use Maven instead of Gradle before generating the project. Import the project into your Java IDE.

3.1 Gradle File

Below we can see the generated build file for our project.

build.gradle

buildscript {
    ext {
        springBootVersion = '2.0.0.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.jcg'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-activemq')
    compile('org.springframework.boot:spring-boot-starter-batch')
    compile('org.apache.activemq:activemq-kahadb-store:5.8.0')
    compile "org.projectlombok:lombok:1.16.8"
    runtime("org.hsqldb:hsqldb")
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('org.springframework.batch:spring-batch-test')
}
  • Spring Boot Version 2.0 is specified in line 3.
  • Idea plugin has been applied to support Idea IDE in line 15.
  • Lines 28-36 declare the dependencies needed for the project with each downloading the latest version from spring.io.
  • Line 29 indicates that we are using the spring implementation of JMS for ActiveMQ.
  • Line 31 declares the dependency kahadb which ActiveMQ uses to persist the queue data reliably to a file. This is essential while running an embedded ActiveMQ instance. If it is not used, the queue gets cleared even before the reader is able to consume the messages.
  • Line 32 declares the dependencyLombok used for reducing boilerplate code.

3.2 POJO (Person)

We use a simple POJO class for reading data from MQ and writing to file. We are using Lombok annotations to auto generate the getter, setter and constructor. The class is made serializable so that it can be transferred across the network i.e. from the queue to file.

Person(POJO) class

package com.jcg.SpringBatchJms.model;

import lombok.*;

import java.io.Serializable;


@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@ToString
public class Person implements Serializable {

    private String firstName;
    private String lastName;
}

3.3 Spring Batch Configuration

Below we will cover the Java configuration for Spring Boot, Batch and JMS. We will discuss each part of the configuration below. We are first covering the main Spring Boot runner class below.

Application Class

package com.jcg.SpringBatchJms;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBatchJmsApplication {


    public static void main(String[] args) {
        SpringApplication.run(SpringBatchJmsApplication.class, args);
    }


}
  • We specify our application as the spring boot application in Line 6. It takes care of all the Auto Configuration magic. Spring boot works on the philosophy of convention over configuration. It provides sensible defaults and allows overriding with the appropriate configuration.
  • Line 11 starts our application with the configuration specified in below section.

Below, we will cover the Batch Configuration modelled in Java class.

Batch Configuration

package com.jcg.SpringBatchJms.config;

import com.jcg.SpringBatchJms.model.Person;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.jms.JmsItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@EnableJms
@Configuration
@EnableBatchProcessing
public class SpringBatchJmsConfig {

    public static final Logger logger = LoggerFactory.getLogger(SpringBatchJmsConfig.class.getName());


    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setMessageConverter(messageConverter());
        return factory;
    }

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }


    @Bean
    public JmsItemReader personJmsItemReader(MessageConverter messageConverter) {
        JmsItemReader personJmsItemReader = new JmsItemReader<>();
        personJmsItemReader.setJmsTemplate(jmsTemplate);
        personJmsItemReader.setItemType(Person.class);
        return personJmsItemReader;
    }



    @Bean
    public FlatFileItemWriter personFlatFileItemWriter() {
        FlatFileItemWriter personFlatFileItemWriter = new FlatFileItemWriter<>();
        personFlatFileItemWriter.setLineAggregator(person -> person.toString());
        personFlatFileItemWriter.setLineSeparator(System.lineSeparator());
        personFlatFileItemWriter.setResource(new FileSystemResource("person.txt"));
        return personFlatFileItemWriter;
    }

    @Bean
    public Job importUserJob() {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(jobExecutionListener())
                .flow(step1())
                .end()
                .build();
    }

    private Step step1() {
        return stepBuilderFactory.get("step1")
                .<Person, Person>chunk(10)
                .reader(personJmsItemReader(messageConverter()))
                .writer(personFlatFileItemWriter())
                .build();
    }

    @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
                Person[] people = {new Person("Jack", "Ryan"), new Person("Raymond", "Red"), new Person("Olivia", "Dunham"),
                        new Person("Walter", "Bishop"), new Person("Harry", "Bosch")};
                for (Person person : people) {
                    logger.info(person.toString());
                    jmsTemplate.convertAndSend(person);
                }
            }

            @Override
            public void afterJob(JobExecution jobExecution) {

            }
        };
    }

}

Line 28 indicates our application is used with JMS. Line 29 indicates that it is a configuration class and should be picked up by spring boot to wire up the beans and dependencies. Line 30 is used to enable batch support for our application. Spring defines a Job which contains multiple Step to be executed. In our example, we use only a single step for our importUserJob. We use a JobExecutionListener to send data to embedded ActiveMQ which we will cover below. A Step could be a TaskletStep (contains a single function for execution) or Step which includes a Reader, Processor and Writer. In the above example, We have used Step.

JMS supports transferring plain strings natively without any further configuration. But in our case, we want to transfer the person object. Hence, In line 62, we have created a MessageConverter which provides the conversion logic for a serializable object to text which we inject to JmsListenerContainerFactory. JmsListenerContainerFactory is the one instantiating the JMSTemplate which is used for sending and receiving messages.

Reader: Here we are using JMSItemReader to consume messages from the queue whose configuration we will discuss in below section. The reader is just provided the Spring boot instantiated JMSTemplate along with the object type(Person). JMSItemReader keeps on reading messages from the queue until there are no further messages to read from the queue.

JMSTemplate needs to be provided with a timeout, else it will keep on waiting for messages from the queue. This is controlled via the receive-timeout property. If this property is not provided, Spring Batch would start complaining. We provide the receive-timeout configuration in the application.yml file.

Writer: This is a simple FlatFileItemWriter which writes the person object’s toString method to a file named person.txt. It writes items in a chunk while reader sends record by record basis.

Listener: In Listener, beforeJob we are creating 5 person records and sending them to ActiveMQ using JMSTemplate. The listener afterJob is not utilized in this example where we can do some clean-up if required.

Next, We will cover the application level configuration in yml files which will be injected into our application and spring container during startup.

application.yml

logging:
  pattern:
    console: "%msg%n"
  level:
    org.springframework: info
    com.jcg: info

spring:
  activemq:
    broker-url: vm://localhost
  jms:
    template:
      default-destination: person-queue
      receive-timeout: 2s
  • Logback configuration is specified in lines 1-6 with info mode for our package
  • In Line 10, we specify the URL as localhost(embedded broker). If unspecified, Spring will create an embedded broker for you. The above syntax is helpful when you want to pass extra configuration options like JMSRedelivery etc.
  • Our JMSTemplate is provided a default ActiveMQ Destination as person-queue and configured to have a receive timeout of 2 seconds. If we don’t specify a receive timeout, our reader will never stop.

4. Local ActiveMQ

In this chapter, we will look at installing ActiveMQ onto our local machine and connecting to it via our application. Navigate to the latest release and download it based on your operating System. The latest release as of now is 5.15.3 and can be downloaded from this URL. Once installed start the ActiveMQ process. ActiveMQ broker exposes the URL tcp://localhost:61616 for publishing and consuming messages while the console is accessed via http://127.0.0.1:8161. In Application, the following changes are applied to connect to the installed ActiveMQ.

application.yml

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin

The broker URL is changed to localhost URL and admin credentials are provided to connect to the ActiveMQ broker. Rest of the configuration remains intact and our application now connects to the person-queue in ActiveMQ.

The following changes are necessary for our java configuration to observe the messages in ActiveMQ as our 5 messages will be consumed in seconds.

 @Bean
    public JobExecutionListener jobExecutionListener() {
        return new JobExecutionListener() {
            @Override
            public void beforeJob(JobExecution jobExecution) {
                IntStream.rangeClosed(1,300).forEach(token->{
                    Person[] people = {new Person("Jack", "Ryan"), new Person("Raymond", "Red"), new Person("Olivia", "Dunham"),
                            new Person("Walter", "Bishop"), new Person("Harry", "Bosch")};
                    for (Person person : people) {
                        logger.info(person.toString());
                        jmsTemplate.convertAndSend(person);
                    }
                });
            }

            @Override
            public void afterJob(JobExecution jobExecution) {

            }
        };
    }

In this example, we are re-sending the same 5 messages 300 times, so that messages will remain in the queue for some time. The below steps should be followed to observe the messages in the broker console.

Navigate to http://localhost:8161/admin/ and click on Queues.

ActiveMQ Queues homepage

You can see the queues configured in this instance. ActiveMQ auto creates the queue during its first invocation which is why we never created the queue in broker console. It also shows statistics of messages feed into the queue and consumed from the queue. It also shows the count of unconsumed messages in the queue. You can choose to purge the queue which will clear all the messages in the queue. There is also an option to delete the queue.

Click on person-queue to see the available messages.

Messages in person queue

It maintains only the unconsumed messages in the queue. The queue is configured as durable and will remain in the queue unless consumed or purged from the queue. We can delete a single message by clicking on the delete link.

Click on a particular message to see the details.

Person-queue message

It displays the message along with various properties of the message. A message can be deleted, copied or even moved to another queue.

5. Summary

Run the Application class from a Java IDE. Output similar to the below screenshot will be displayed.

Spring Batch JMS logs
Spring Batch JMS logs

We can also check the output in the generated file which is similar to below screenshot

JMS Person file output
JMS Person file output

In this example, we saw a simple way to configure a Spring Batch Application with a JMSItemReader consuming data from embedded ActiveMQ and writing it out to a flat file. We also went ahead and installed ActiveMQ locally and consumed messages from the queue and writing out to a flat file.

6. Download the Source Code

Download
You can download the full source code of this example here: SpringBatchJMSItemReaderExample

Rajagopal ParthaSarathi

Rajagopal works in software industry solving enterprise-scale problems for customers across geographies specializing in distributed platforms. He holds a masters in computer science with focus on cloud computing from Illinois Institute of Technology. His current interests include data science and distributed computing.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Raj
Raj
5 years ago

reader-transactional-queue=”true”
How to use this flag?
Do we have to explicitly manage transactions?

manoj gupta
manoj gupta
4 years ago

How We can use this code for IBM MQ? Please provide link or suggestion.

Back to top button