Apache Camel Aggregator Example
1. Introduction
In this article, we will see an example of Aggregator using Apache Camel. We will create an order with two different types of items and shall see how these items are processed individually to get the price of each one and calculating the total price of all items of this order by using aggregation strategy.
2. What is Aggregator ?
The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.
A correlation Expression is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant expression. An AggregationStrategy is used to combine all the message exchanges for a single correlation key into a single message exchange.
3. Technology Stack
In this example we will be using following technology stack:
- Maven 4.0 – Build and dependency tool. You can visit here for more details
- Apache Camel 2.16.0 – Open-source integration framework based on known Enterprise Integration Patterns.
- Spring 4.1.6.RELEASE – Comprehensive programming and configuration model for modern Java-based enterprise applications
- Spring Tool Suite (STS) – An Eclipse-based development environment that is customized for developing Spring applications.
4. Apache Camel AggregatorExample
4.1 Dependencies
To continue using our example, we need to add the dependent jar files to the classpath. This can be achieved either by deploying directly the jar file or using the Maven.
Since we are using Maven for our example we will be using the pom.xml for the dependency of the following jars :
- camel-core
- camel-cxf
- camel-spring
- spring-core
Copy the below code and paste it onto the pom.xml file under the properties
and dependencies
tag.
pom.xml
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | < properties > < camelspring.version >2.16.0</ camelspring.version > < spring.version >4.1.6.RELEASE</ spring.version > </ properties > < dependencies > < dependency > < groupId >org.apache.camel</ groupId > < artifactId >camel-core</ artifactId > < version >${camelspring.version}</ version > </ dependency > < dependency > < groupId >org.apache.camel</ groupId > < artifactId >camel-cxf</ artifactId > < version >${camelspring.version}</ version > </ dependency > < dependency > < groupId >org.apache.camel</ groupId > < artifactId >camel-spring</ artifactId > < version >${camelspring.version}</ version > </ dependency > < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-core</ artifactId > < version >${spring.version}</ version > </ dependency > </ dependencies > |
4.2 Model Classes
For our example, we need to create two Java POJO class named as Item & Order. Each POJO class has their respective member variables, parameterized constructor, getter and setter methods, and along with toString method.
Item.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | package com.model; public class Item { public Item(String id, String name, String type) { this .id = id; this .name = name; this .type = type; } private String id; private String name; private String type; private double price; public String getId() { return id; } public String getName() { return name; } public double getPrice() { return price; } public void setPrice( double price) { this .price = price; } public String getType() { return type; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append( "Item [id=" ); builder.append(id); builder.append( ", name=" ); builder.append(name); builder.append( ", type=" ); builder.append(type); builder.append( ", price=" ); builder.append(price); builder.append( "]" ); return builder.toString(); } } |
Order.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | package com.model; import java.util.List; public class Order { private String id; private List items; private double totalPrice; public String getId() { return id; } public void setId(String id) { this .id = id; } public List getItems() { return items; } public void setItems(List items) { this .items = items; } public double getTotalPrice() { return totalPrice; } public void setTotalPrice( double totalPrice) { this .totalPrice = totalPrice; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append( "Order [id=" ); builder.append(id); builder.append( ", items=" ); builder.append(items); builder.append( ", totalPrice=" ); builder.append(totalPrice); builder.append( "]" ); return builder.toString(); } } |
4.3 Item Process Java Class
In the below Java class, while routing, the two methods are used to set the pricing for the different items like book and phone.
ItemSvc.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | package com; import com.model.Item; public class ItemSvc { public Item processBook(Item item) throws InterruptedException { System.out.println( "handle book Item:" +item); item.setPrice( 30 ); System.out.println( "book Item processed" ); return item; } public Item processPhone(Item item) throws InterruptedException { System.out.println( "handle phone Item:" +item); item.setPrice( 500 ); System.out.println( "phone Item processed" ); return item; } } |
4.4 Route Java Class
The below java class is extended from RouteBuilder. The RouteBuilder is a base class which is derived from to create routing rules using the DSL. Instances of RouteBuilder are then added to the CamelContext.
In our example, we are using two different routes. The first route is accepting the input as the instance of Order Model Class, then separate the Items data from it and send the split message for processing. The output is then sent to the second route.
The second route then processes the data (setting the price) based on the type of the item.
OrderRouter.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | package com; import org.apache.camel.builder.RouteBuilder; public class OrderRouter extends RouteBuilder { @Override public void configure() throws Exception { from( "direct:processOrder" ) .split(body().method( "getItems" ), new OrderItemStrategy()) // each splitted message is send to this bean to process it .to( "direct:processItem" ) .end(); from( "direct:processItem" ) .choice() .when(body().method( "getType" ).isEqualTo( "Book" )) .to( "bean:itemService?method=processBook" ). otherwise() .to( "bean:itemService?method=processPhone" ); } } |
4.5 AggregationStrategy Java Class
To use the AggregartionStrategy in our example, we implement the interface named AggregationStrategy for our purpose.
On the first invocation of the aggregate method, the oldExchange parameter is null. The reason is that we have not aggregated anything yet. So it’s only the newExchange that has a value. Usually, you just return the newExchange in this situation. But you still have the power to decide what to do, for example, you can do some alternation on the exchange or remove some headers.
OrderItemStrategy.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | package com; import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.processor.aggregate.AggregationStrategy; import com.model.Item; import com.model.Order; public class OrderItemStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null ) { Item newItem= newExchange.getIn().getBody(Item. class ); System.out.println( "Aggregate first item: " + newItem); Order currentOrder = new Order(); currentOrder.setId( "ORD" +System.currentTimeMillis()); List currentItems = new ArrayList(); currentItems.add(newItem); currentOrder.setItems(currentItems); currentOrder.setTotalPrice(newItem.getPrice()); newExchange.getIn().setBody(currentOrder); // the first time we aggregate we only have the new exchange, // so we just return it return newExchange; } Order order = oldExchange.getIn().getBody(Order. class ); Item newItem= newExchange.getIn().getBody(Item. class ); System.out.println( "Aggregate old items: " + order); System.out.println( "Aggregate new item: " + newItem); order.getItems().add(newItem); double totalPrice = order.getTotalPrice() + newItem.getPrice(); order.setTotalPrice(totalPrice); // return old as this is the one that has all the orders gathered until now return oldExchange; } } |
4.6 Main Java Class
In this Java class, firstly, we create the instance of the ApplicationContext based for camel-context.xml file. Then start the Camel context so that we can use the route java class. For our example, we have used the createProducerTemplate method of the created camel context’s instance, so that we can send the data to the route for processing. In our case, then we send request body with three different parameters like Router name, Order instance, and Class type.
At last, we stop the instance of the Camel context.
OrderApp.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | package com; import java.util.ArrayList; import java.util.List; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.model.Item; import com.model.Order; public class OrderApp { public static void main(String[] args) { try { ApplicationContext applicationContext = new ClassPathXmlApplicationContext( "camel-context.xml" ); CamelContext camelContext = applicationContext.getBean( "orderCtx" , CamelContext. class ); camelContext.start(); ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); List items = new ArrayList(); items.add( new Item( "1" , "Camel in Action book" , "Book" )); items.add( new Item( "2" , "Apple IPhone8" , "Phone" )); Order myOrder = new Order(); myOrder.setItems(items); Order respOrder = producerTemplate.requestBody( "direct:processOrder" , myOrder, Order. class ); System.out.println( "resp order:" +respOrder); camelContext.stop(); } catch (Exception e) { e.printStackTrace(); } } } |
Console Output
1 2 3 4 5 6 7 8 | handle book Item:Item [ id =1, name=Camel in Action book, type =Book, price=0.0] book Item processed Aggregate first item: Item [ id =1, name=Camel in Action book, type =Book, price=30.0] handle phone Item:Item [ id =2, name=Apple IPhone8, type =Phone, price=0.0] phone Item processed Aggregate old items: Order [ id =ORD1493803849785, items=[Item [ id =1, name=Camel in Action book, type =Book, price=30.0]], totalPrice=30.0] Aggregate new item: Item [ id =2, name=Apple IPhone8, type =Phone, price=500.0] resp order:Order [ id =ORD1493803849785, items=[Item [ id =1, name=Camel in Action book, type =Book, price=30.0], Item [ id =2, name=Apple IPhone8, type =Phone, price=500.0]], totalPrice=530.0] |
5. Conclusion
Here in Apache Camel Aggregator Example, we have learnt about the AggregationStrategy which is used to combine all the message exchanges for a single correlation key into a single message exchange.
So, now you are ready to implement the aggregation in Apache camel applications with the help of AggregationStrategy.
6. Download the Spring Tool Suite Project
This was an example of Aggregator using Apache Camel.
You can download the full source code of this example here: Aggregator.zip
This aggregator won’t work as it is not setting anything.