Home » Enterprise Java » spring » Integration » Spring Integration Database Polling Example

About Mahboob Hussain

Mahboob Hussain
Mahboob Hussain graduated in Engineering from NIT Nagpur, India and has an MBA from Webster University, USA. He has executed roles in various aspects of software development and technical governance. He started with FORTRAN and has programmed in a variety of languages in his career, the mainstay of which has been Java. He is an associate editor in our team and has his personal homepage at http://bit.ly/mahboob

Spring Integration Database Polling Example

1. Introduction

Spring Integration provides JDBC channel adapters that connect a channel to a database. In the case of the inbound adapter, a database is the source on which an SQL query can be run and the complete result set is available as a message with a Java List payload. You can map the rows to a custom POJO which will allow you to use them with business logic semantics. But the more interesting feature is that of the poller that you could configure within an adapter to run periodically at set intervals.

The database poller has been used in scenarios where large amounts of data needed to be moved from one database to another, or to pass data to a JMS queue or store status of processing of XML files. See the relevant articles given in the Useful Links section.

2. Application

We will demonstrate the database poller with a Spring Boot application that polls an embedded H2 database. It runs a select query every four seconds to fetch all records and updates the INVENTORY_STATUS of the records to 1.

3. Environment

I have used the following technologies for this application:

  • Java 1.8
  • Spring Boot 1.5.10
  • Maven 3.3.9
  • Ubuntu 16.04 LTS

4. Source Code

This is a Maven-based project, so all the required libraries are declared in pom.xml.

pom.sql

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>org.javacodegeeks.springintegration.polling</groupId>
	<artifactId>dbpoller_h2</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>dbpoller_h2</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.10.RELEASE</version>
		<relativePath /> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-jdbc</artifactId>
		</dependency>

		<dependency>
			<groupId>com.h2database</groupId>
			<artifactId>h2</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

We have the dependency spring-boot-starter-web to provide web access to the application and have set spring.h2.console.enabled=true in application.properties file to enable the H2 console.

Below is the schema.sql file that Spring Boot runs at application startup.

schema.sql

CREATE TABLE Items (
    ITEM_ID VARCHAR(10) PRIMARY KEY,
    DESCRIPTION VARCHAR(50) NOT NULL,
    INVENTORY_STATUS INTEGER NOT NULL);

The SQL code here creates the table Items with three columns ITEM_ID, DESCRIPTION and INVENTORY_STATUS. The first two columns are of type VARCHAR whereas the last one is of type INTEGER.

Given below is data.sql which is used to insert test data at application startup.

data.sql

INSERT INTO Items (ITEM_ID, DESCRIPTION, INVENTORY_STATUS) VALUES
   ('Item_id0', 'Item_description0', 0),
   ('Item_id1', 'Item_description1', 0),
   ('Item_id2', 'Item_description2', 0),
   ('Item_id3', 'Item_description3', 0),
   ('Item_id4', 'Item_description4', 0),
   ('Item_id5', 'Item_description5', 0),
   ('Item_id6', 'Item_description6', 0),
   ('Item_id7', 'Item_description7', 0),
   ('Item_id8', 'Item_description8', 0),
   ('Item_id9', 'Item_description9', 0),
   ('Item_id10', 'Item_description10', 0),
   ('Item_id11', 'Item_description11', 0),
   ('Item_id12', 'Item_description12', 0),
   ('Item_id13', 'Item_description13', 0),
   ('Item_id14', 'Item_description14', 0),
   ('Item_id15', 'Item_description15', 0),
   ('Item_id16', 'Item_description16', 0),
   ('Item_id17', 'Item_description17', 0),
   ('Item_id18', 'Item_description18', 0),
   ('Item_id19', 'Item_description19', 0),
   ('Item_id20', 'Item_description20', 0),
   ('Item_id21', 'Item_description21', 0),
   ('Item_id22', 'Item_description22', 0),
   ('Item_id23', 'Item_description23', 0),
   ('Item_id24', 'Item_description24', 0),
   ('Item_id25', 'Item_description25', 0),
   ('Item_id26', 'Item_description26', 0),
   ('Item_id27', 'Item_description27', 0),
   ('Item_id28', 'Item_description28', 0),
   ('Item_id29', 'Item_description29', 0),
   ('Item_id30', 'Item_description30', 0),
   ('Item_id31', 'Item_description31', 0),
   ('Item_id32', 'Item_description32', 0),
   ('Item_id33', 'Item_description33', 0),
   ('Item_id34', 'Item_description34', 0),
   ('Item_id35', 'Item_description35', 0),
   ('Item_id36', 'Item_description36', 0),
   ('Item_id37', 'Item_description37', 0),
   ('Item_id38', 'Item_description38', 0),
   ('Item_id39', 'Item_description39', 0),
   ('Item_id40', 'Item_description40', 0),
   ('Item_id41', 'Item_description41', 0),
   ('Item_id42', 'Item_description42', 0),
   ('Item_id43', 'Item_description43', 0),
   ('Item_id44', 'Item_description44', 0),
   ('Item_id45', 'Item_description45', 0),
   ('Item_id46', 'Item_description46', 0),
   ('Item_id47', 'Item_description47', 0),
   ('Item_id48', 'Item_description48', 0),
   ('Item_id49', 'Item_description49', 0),
   ('Item_id50', 'Item_description50', 0),
   ('Item_id51', 'Item_description51', 0),
   ('Item_id52', 'Item_description52', 0),
   ('Item_id53', 'Item_description53', 0),
   ('Item_id54', 'Item_description54', 0),
   ('Item_id55', 'Item_description55', 0),
   ('Item_id56', 'Item_description56', 0),
   ('Item_id57', 'Item_description57', 0),
   ('Item_id58', 'Item_description58', 0),
   ('Item_id59', 'Item_description59', 0),
   ('Item_id60', 'Item_description60', 0),
   ('Item_id61', 'Item_description61', 0),
   ('Item_id62', 'Item_description62', 0),
   ('Item_id63', 'Item_description63', 0),
   ('Item_id64', 'Item_description64', 0),
   ('Item_id65', 'Item_description65', 0),
   ('Item_id66', 'Item_description66', 0),
   ('Item_id67', 'Item_description67', 0),
   ('Item_id68', 'Item_description68', 0),
   ('Item_id69', 'Item_description69', 0),
   ('Item_id70', 'Item_description70', 0),
   ('Item_id71', 'Item_description71', 0),
   ('Item_id72', 'Item_description72', 0),
   ('Item_id73', 'Item_description73', 0),
   ('Item_id74', 'Item_description74', 0),
   ('Item_id75', 'Item_description75', 0),
   ('Item_id76', 'Item_description76', 0),
   ('Item_id77', 'Item_description77', 0),
   ('Item_id78', 'Item_description78', 0),
   ('Item_id79', 'Item_description79', 0),
   ('Item_id80', 'Item_description80', 0),
   ('Item_id81', 'Item_description81', 0),
   ('Item_id82', 'Item_description82', 0),
   ('Item_id83', 'Item_description83', 0),
   ('Item_id84', 'Item_description84', 0),
   ('Item_id85', 'Item_description85', 0),
   ('Item_id86', 'Item_description86', 0),
   ('Item_id87', 'Item_description87', 0),
   ('Item_id88', 'Item_description88', 0),
   ('Item_id89', 'Item_description89', 0),
   ('Item_id90', 'Item_description90', 0),
   ('Item_id91', 'Item_description91', 0),
   ('Item_id92', 'Item_description92', 0),
   ('Item_id93', 'Item_description93', 0),
   ('Item_id94', 'Item_description94', 0),
   ('Item_id95', 'Item_description95', 0),
   ('Item_id96', 'Item_description96', 0),
   ('Item_id97', 'Item_description97', 0),
   ('Item_id98', 'Item_description98', 0),
   ('XXX', 'last item', 0);

The SQL code here INSERTs 100 rows into the table Items. For the first 99 rows, values in column ITEM_ID have values like Item_id followed by an integer that is incremented starting from zero. Similarly, values in column DESCRIPTION have values like Item_description followed by an integer that is incremented starting from zero. The last row has values ‘XXX’ in ITEM_ID column and ‘last item’ in the DESCRIPTION column. All hundred records have the value zero in INVENTORY_STATUS column.

Following is the xml file that has the configuration for the application.

application-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
	xmlns:int="http://www.springframework.org/schema/integration"
	xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc"
	xmlns:jdbc="http://www.springframework.org/schema/jdbc"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
	   http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
	   http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
	   http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd
	   http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd">

	<int:channel id="fromdb"/>
	<int:service-activator input-channel="fromdb"
		ref="jdbcMessageHandler" />
	<int-jdbc:inbound-channel-adapter
		channel="fromdb" data-source="dataSource"
		query="SELECT * FROM Items WHERE INVENTORY_STATUS = 0"
		update="UPDATE Items SET INVENTORY_STATUS = 1">
		<int:poller fixed-delay="4000" />
	</int-jdbc:inbound-channel-adapter>
</beans>

In this file, first we declare a channel with id fromdb. Then we configure the class JdbcMessageHandler to be the service activator on this channel, which essentially executes the service method for each message that arrives in the channel. Finally, we define a jdbc:inbound-channel-adapter that connects the default dataSource to the channel we have declared. The SELECT query fetches all records that have value 0 in the INVENTORY_STATUS column and the UPDATE query modifies this value to 1. The queries are configured to run every four seconds.

Next, we take a look at the service activator class.

JdbcMessageHandler.java

package org.javacodegeeks.springintegration.polling.dbpoller;

import java.util.List;
import java.util.Map;

import org.springframework.stereotype.Component;

@Component
public class JdbcMessageHandler {

	public void handleJdbcMessage(List<Map> message) {
		for (Map resultMap: message) {
			System.out.println("Row");
			for (String column: resultMap.keySet()) {
				System.out.println("column: " + column + " value: " + resultMap.get(column));
			}
		}
	}
}

The handleJdbcMessage method takes in a List of Maps which represents the query result set. For each message, it first prints the text “Row”, followed by the string “column: “, column name and the value in that column.

Given below is the DbpollerApplication class which is the main class of the application.

DbpollerApplication.java

package org.javacodegeeks.springintegration.polling.dbpoller;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication
@ImportResource("application-context.xml")
public class DbpollerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DbpollerApplication.class, args);
	}
}

Using the annotation @ImportResource, we load the configuration in the file application-context.xml and in the main method we just call SpringBootApplication.run for the application to start.

5. How To Run

At the command prompt, just run:

mvn spring-boot:run

In the terminal window, you will see all the rows printed by the service activator class in the format explained previously. In the H2 web console accessible at http://localhost:8080/h2-console/, you will see the result of the update query that is all the values in the INVENTORY_STATUS columns are modified to 1. You can run a query to re-set these values to zero and when the poller runs next, it fetches all the rows and updates them to 1. The screenshots for these test steps are given below.

The first screenshot shows the console output from the service actuator showing the select query result set.

Console output when application is first run

The second screenshot shows the H2 console output with values in INVENTORY_STATUS column changed to 1 by the JDBC inbound channel adapter.

H2 Console output showing table rows after application is first run.

The third screenshot shows the update query we run in the H2 console to reset the values in INVENTORY_STATUS column to zero.

Test step to update the INVENTORY_STATUS column values to 0

The fourth screenshot shows that the values in INVENTORY_STATUS column are zero.

H2 Console output showing value in INVENTORY_STATUS column changing to 0

We re-check the data in the table after four seconds. The last screenshot shows that the values in the INVENTORY_STATUS column are updated to 1 by the JDBC inbound channel adapter.

H2 Console output showing values in INVENTORY_STATUS column changed to 1 by inbound channel adapter.

6. Summary

In this article, we have discussed the jdbc:inbound-channel-adapter with its polling feature. We have seen the implementation details of a simple Spring Boot application that periodically queries and updates a H2 database. The download file has a MySQL version also that you can run and test.

7. Useful Links

8. Download the Source Code

Download
You can download the full source code of this example here: dbpoller
(No Ratings Yet)
2 Comments Views Tweet it!

Do you want to know how to develop your skillset to become a Java Rockstar?

Subscribe to our newsletter to start Rocking right now!

To get you started we give you our best selling eBooks for FREE!

 

1. JPA Mini Book

2. JVM Troubleshooting Guide

3. JUnit Tutorial for Unit Testing

4. Java Annotations Tutorial

5. Java Interview Questions

6. Spring Interview Questions

7. Android UI Design

 

and many more ....

 

Receive Java & Developer job alerts in your Area

 

2
Leave a Reply

avatar
2 Comment threads
0 Thread replies
0 Followers
 
Most reacted comment
Hottest comment thread
2 Comment authors
mcrobbjToniez Recent comment authors
  Subscribe  
newest oldest most voted
Notify of
Toniez
Guest
Toniez

Do you have a similar example for a jdbc-outbound-adapter that uses data from a complex POJO to populate a remote database? I am particularly interested in how data from the payload is used as parameter source.

thanks!

mcrobbj
Guest
mcrobbj