Spring Integration Database Polling Example
1. Introduction
Spring Integration provides JDBC
channel adapters that connect a channel to a database. In the case of the inbound adapter, a database is the source on which an SQL
query can be run and the complete result set is available as a message with a Java List
payload. You can map the rows to a custom POJO which will allow you to use them with business logic semantics. But the more interesting feature is that of the poller that you could configure within an adapter to run periodically at set intervals.
The database poller has been used in scenarios where large amounts of data needed to be moved from one database to another, or to pass data to a JMS
queue or store status of processing of XML files. See the relevant articles given in the Useful Links section.
2. Application
We will demonstrate the database poller with a Spring Boot application that polls an embedded H2 database. It runs a select query every four seconds to fetch all records and updates the INVENTORY_STATUS
of the records to 1.
3. Environment
I have used the following technologies for this application:
- Java 1.8
- Spring Boot 1.5.10
- Maven 3.3.9
- Ubuntu 16.04 LTS
4. Source Code
This is a Maven-based project, so all the required libraries are declared in pom.xml.
pom.sql
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.javacodegeeks.springintegration.polling</groupId> <artifactId>dbpoller_h2</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>dbpoller_h2</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
We have the dependency spring-boot-starter-web
to provide web access to the application and have set spring.h2.console.enabled=true
in application.properties
file to enable the H2 console.
Below is the schema.sql file that Spring Boot runs at application startup.
schema.sql
CREATE TABLE Items ( ITEM_ID VARCHAR(10) PRIMARY KEY, DESCRIPTION VARCHAR(50) NOT NULL, INVENTORY_STATUS INTEGER NOT NULL);
The SQL code here creates the table Items
with three columns ITEM_ID
, DESCRIPTION
and INVENTORY_STATUS
. The first two columns are of type VARCHAR
whereas the last one is of type INTEGER
.
Given below is data.sql which is used to insert test data at application startup.
data.sql
INSERT INTO Items (ITEM_ID, DESCRIPTION, INVENTORY_STATUS) VALUES ('Item_id0', 'Item_description0', 0), ('Item_id1', 'Item_description1', 0), ('Item_id2', 'Item_description2', 0), ('Item_id3', 'Item_description3', 0), ('Item_id4', 'Item_description4', 0), ('Item_id5', 'Item_description5', 0), ('Item_id6', 'Item_description6', 0), ('Item_id7', 'Item_description7', 0), ('Item_id8', 'Item_description8', 0), ('Item_id9', 'Item_description9', 0), ('Item_id10', 'Item_description10', 0), ('Item_id11', 'Item_description11', 0), ('Item_id12', 'Item_description12', 0), ('Item_id13', 'Item_description13', 0), ('Item_id14', 'Item_description14', 0), ('Item_id15', 'Item_description15', 0), ('Item_id16', 'Item_description16', 0), ('Item_id17', 'Item_description17', 0), ('Item_id18', 'Item_description18', 0), ('Item_id19', 'Item_description19', 0), ('Item_id20', 'Item_description20', 0), ('Item_id21', 'Item_description21', 0), ('Item_id22', 'Item_description22', 0), ('Item_id23', 'Item_description23', 0), ('Item_id24', 'Item_description24', 0), ('Item_id25', 'Item_description25', 0), ('Item_id26', 'Item_description26', 0), ('Item_id27', 'Item_description27', 0), ('Item_id28', 'Item_description28', 0), ('Item_id29', 'Item_description29', 0), ('Item_id30', 'Item_description30', 0), ('Item_id31', 'Item_description31', 0), ('Item_id32', 'Item_description32', 0), ('Item_id33', 'Item_description33', 0), ('Item_id34', 'Item_description34', 0), ('Item_id35', 'Item_description35', 0), ('Item_id36', 'Item_description36', 0), ('Item_id37', 'Item_description37', 0), ('Item_id38', 'Item_description38', 0), ('Item_id39', 'Item_description39', 0), ('Item_id40', 'Item_description40', 0), ('Item_id41', 'Item_description41', 0), ('Item_id42', 'Item_description42', 0), ('Item_id43', 'Item_description43', 0), ('Item_id44', 'Item_description44', 0), ('Item_id45', 'Item_description45', 0), ('Item_id46', 'Item_description46', 0), ('Item_id47', 'Item_description47', 0), ('Item_id48', 'Item_description48', 0), ('Item_id49', 'Item_description49', 0), ('Item_id50', 'Item_description50', 0), ('Item_id51', 'Item_description51', 0), ('Item_id52', 'Item_description52', 0), ('Item_id53', 'Item_description53', 0), ('Item_id54', 'Item_description54', 0), ('Item_id55', 'Item_description55', 0), ('Item_id56', 'Item_description56', 0), ('Item_id57', 'Item_description57', 0), ('Item_id58', 'Item_description58', 0), ('Item_id59', 'Item_description59', 0), ('Item_id60', 'Item_description60', 0), ('Item_id61', 'Item_description61', 0), ('Item_id62', 'Item_description62', 0), ('Item_id63', 'Item_description63', 0), ('Item_id64', 'Item_description64', 0), ('Item_id65', 'Item_description65', 0), ('Item_id66', 'Item_description66', 0), ('Item_id67', 'Item_description67', 0), ('Item_id68', 'Item_description68', 0), ('Item_id69', 'Item_description69', 0), ('Item_id70', 'Item_description70', 0), ('Item_id71', 'Item_description71', 0), ('Item_id72', 'Item_description72', 0), ('Item_id73', 'Item_description73', 0), ('Item_id74', 'Item_description74', 0), ('Item_id75', 'Item_description75', 0), ('Item_id76', 'Item_description76', 0), ('Item_id77', 'Item_description77', 0), ('Item_id78', 'Item_description78', 0), ('Item_id79', 'Item_description79', 0), ('Item_id80', 'Item_description80', 0), ('Item_id81', 'Item_description81', 0), ('Item_id82', 'Item_description82', 0), ('Item_id83', 'Item_description83', 0), ('Item_id84', 'Item_description84', 0), ('Item_id85', 'Item_description85', 0), ('Item_id86', 'Item_description86', 0), ('Item_id87', 'Item_description87', 0), ('Item_id88', 'Item_description88', 0), ('Item_id89', 'Item_description89', 0), ('Item_id90', 'Item_description90', 0), ('Item_id91', 'Item_description91', 0), ('Item_id92', 'Item_description92', 0), ('Item_id93', 'Item_description93', 0), ('Item_id94', 'Item_description94', 0), ('Item_id95', 'Item_description95', 0), ('Item_id96', 'Item_description96', 0), ('Item_id97', 'Item_description97', 0), ('Item_id98', 'Item_description98', 0), ('XXX', 'last item', 0);
The SQL
code here INSERTs 100 rows into the table Items. For the first 99 rows, values in column ITEM_ID
have values like Item_id followed by an integer that is incremented starting from zero. Similarly, values in column DESCRIPTION
have values like Item_description followed by an integer that is incremented starting from zero. The last row has values ‘XXX’ in ITEM_ID
column and ‘last item’ in the DESCRIPTION
column. All hundred records have the value zero in INVENTORY_STATUS
column.
Following is the xml file that has the configuration for the application.
application-context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd"> <int:channel id="fromdb"/> <int:service-activator input-channel="fromdb" ref="jdbcMessageHandler" /> <int-jdbc:inbound-channel-adapter channel="fromdb" data-source="dataSource" query="SELECT * FROM Items WHERE INVENTORY_STATUS = 0" update="UPDATE Items SET INVENTORY_STATUS = 1"> <int:poller fixed-delay="4000" /> </int-jdbc:inbound-channel-adapter> </beans>
In this file, first we declare a channel with id fromdb
. Then we configure the class JdbcMessageHandler
to be the service activator on this channel, which essentially executes the service method for each message that arrives in the channel. Finally, we define a jdbc:inbound-channel-adapter
that connects the default dataSource
to the channel we have declared. The SELECT
query fetches all records that have value 0 in the INVENTORY_STATUS
column and the UPDATE
query modifies this value to 1. The queries are configured to run every four seconds.
Next, we take a look at the service activator class.
JdbcMessageHandler.java
package org.javacodegeeks.springintegration.polling.dbpoller; import java.util.List; import java.util.Map; import org.springframework.stereotype.Component; @Component public class JdbcMessageHandler { public void handleJdbcMessage(List<Map> message) { for (Map resultMap: message) { System.out.println("Row"); for (String column: resultMap.keySet()) { System.out.println("column: " + column + " value: " + resultMap.get(column)); } } } }
The handleJdbcMessage
method takes in a List
of Map
s which represents the query result set. For each message, it first prints the text “Row”, followed by the string “column: “, column name and the value in that column.
Given below is the DbpollerApplication
class which is the main class of the application.
DbpollerApplication.java
package org.javacodegeeks.springintegration.polling.dbpoller; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ImportResource; @SpringBootApplication @ImportResource("application-context.xml") public class DbpollerApplication { public static void main(String[] args) { SpringApplication.run(DbpollerApplication.class, args); } }
Using the annotation @ImportResource
, we load the configuration in the file application-context.xml
and in the main method we just call SpringBootApplication.run
for the application to start.
5. How To Run
At the command prompt, just run:
mvn spring-boot:run
In the terminal window, you will see all the rows printed by the service activator class in the format explained previously. In the H2 web console accessible at http://localhost:8080/h2-console/, you will see the result of the update query that is all the values in the INVENTORY_STATUS
columns are modified to 1. You can run a query to re-set these values to zero and when the poller runs next, it fetches all the rows and updates them to 1. The screenshots for these test steps are given below.
The first screenshot shows the console output from the service actuator showing the select query result set.
The second screenshot shows the H2 console output with values in INVENTORY_STATUS column changed to 1 by the JDBC inbound channel adapter.
The third screenshot shows the update query we run in the H2 console to reset the values in INVENTORY_STATUS column to zero.
The fourth screenshot shows that the values in INVENTORY_STATUS column are zero.
We re-check the data in the table after four seconds. The last screenshot shows that the values in the INVENTORY_STATUS column are updated to 1 by the JDBC inbound channel adapter.
6. Summary
In this article, we have discussed the jdbc:inbound-channel-adapter
with its polling feature. We have seen the implementation details of a simple Spring Boot application that periodically queries and updates a H2
database. The download file has a MySQL
version also that you can run and test.
7. Useful Links
- http://jussi.hallila.com/2016/09/05/programmatically-managing-spring-integration-poller.html
- http://byteposts.blogspot.in/2015/01/spring-integration-bulk-processing.html
- https://www.polarsparc.com/xhtml/SpringIntegration-03.html
8. Download the Source Code
You can download the full source code of this example here: dbpoller
Do you have a similar example for a jdbc-outbound-adapter that uses data from a complex POJO to populate a remote database? I am particularly interested in how data from the payload is used as parameter source.
thanks!
has an error for me with the schema:
http://www.springframework.org/schema/integration/spring-integration.xsd which when I look at it suggests the used of https://github.com/spring-projects/spring-integration/tree/master/spring-integration-core/src/main/resources/org/springframework/integration/config
which results in more errors.
Do you have a config that works?
How to apply it for Join Query between the tables.
Thank you for sharing your expertise on this topic. Unified Mentor is an online learning platform that offers user-friendly certification courses. Learn and excel in your chosen field with ease. I hope you will like our platform and courses. Keep sharing information and content.