Apache Camel Exchange Example
Camel Exchange represents an abstraction for an exchange of messages which involves a request message and its corresponding reply or an exception message. It consists of the below components:
- Exchange ID – A unique ID that identifies the exchange.
- MEP – A pattern that denotes whether you’re using the InOnly or InOut messaging.
- InOnly message – Contains only a one-way message (also known as an Event message). For example, the JMS messaging.
- InOut message – It represents a request-response message. In this scenario, you not only have a request message, you also have an out message containing the reply of the request sent. For example, HTTP-based transports.
- Exception – If an error occurs at any time during routing, an Exception will be set in the exception field
- Properties – Similar to message headers, but they last for the duration of the entire exchange.
We will see an example of each component, lets now come to the setup part.
This example uses the following frameworks:
- Maven 3.2.3
- Apache Camel 2.15.1
- Spring 4.1.5.RELEASE
- Eclipse as the IDE, version Luna 4.4.1.
Before we start with the examples, lets first add our dependencies to pom.xml
.
1. Dependencies
Add the following dependencies to pom.xml
:
camel-core
– basic module of apache camel.camel-stream
– We will use this to send output to the console.camel-jms
andactivemq-camel
– ActiveMQ JMS components. We will use this to show OnWay request examplespring-context
andcamel-spring
– Since we be configuring our camel context in spring.camel-http
– Camel http component. We will use this to show a request-response example.
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javacodegeeks.camel</groupId> <artifactId>camelHelloWorld</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-core</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-stream</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-camel</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring</artifactId> <version>2.15.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-http</artifactId> <version>2.15.1</version> </dependency> </dependencies> </project>
2. InOnly Message Example
In this sample, we will see an example of InOnly message.
We want to listen for messages on a queue, process the messages using a POJO and print the returned message in console. The message is an InOnly message, we print the Exchange
details in MyBean1.doSomething
.
activemqInOnlyApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false" /> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="activemq:queue:inboundQueue" /> <to uri="bean:myBean1"/> <to uri="stream:out" /> </route> </camelContext> <bean id="myBean1" class="com.javacodegeeks.camel.MyBean1"/> </beans>
In here, we print the exchange details like the received message and the exchange pattern.
MyBean1:
package com.javacodegeeks.camel; import org.apache.camel.Exchange; public class MyBean1 { public String doSomething(Exchange exchange) { System.out.println("Bean1 Received Exchange: " + exchange.getIn().getBody(String.class) + ", MIP: " + exchange.getPattern()); return exchange.getIn().getBody(String.class); } }
CamelActivemqInExampleUsingSpring
is the main class to initiate the routing.
CamelActivemqInExampleUsingSpring:
package com.javacodegeeks.camel; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.spring.SpringCamelContext; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class CamelActivemqInExampleUsingSpring { public static final void main(String[] args) throws Exception { ApplicationContext appContext = new ClassPathXmlApplicationContext( "activemqInOnlyApplicationContext.xml"); CamelContext camelContext = SpringCamelContext.springCamelContext( appContext, false); try { ProducerTemplate template = camelContext.createProducerTemplate(); camelContext.start(); template.sendBody("activemq:queue:inboundQueue", "InOnly example"); Thread.sleep(1000); } finally { camelContext.stop(); } } }
Output:
Bean1 Received Exchange: InOnly example, MIP: InOnly InOnly example
3. InOut Message Example
Let’ see an example of InOut message involving http request and response. We will want to query ‘camel’ in site http://www.javacodegeeks.com
. The returned response will be an html text of which we will print the first 100 characters. We will set the header CamelHttpQuery
to camel
.
httpInOutApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" trace="true"> <route> <from uri="direct:start" /> <setHeader headerName="CamelHttpQuery"> <constant>s=camel</constant> </setHeader> <to uri="http://www.javacodegeeks.com" /> <process ref="httpProcessor"/> </route> </camelContext> <bean id="httpProcessor" class="com.javacodegeeks.camel.HttpProcessor"/> </beans>
We will print the exchange pattern in HttpProcessor
. It will be of InOut
type.
HttpProcessor:
package com.javacodegeeks.camel; import org.apache.camel.Exchange; import org.apache.camel.Processor; public class HttpProcessor implements Processor { public void process(Exchange exchange) throws Exception { System.out.println("MIP of Http Endpoint is " + exchange.getPattern()); } }
CamelHttpInOutExampleUsingSpring:
package com.javacodegeeks.camel; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.apache.camel.CamelContext; import org.apache.camel.ExchangePattern; import org.apache.camel.ProducerTemplate; import org.apache.camel.spring.SpringCamelContext; import org.apache.camel.util.CollectionStringBuffer; import org.apache.camel.util.IOHelper; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class CamelHttpInOutExampleUsingSpring { public static final void main(String[] args) throws Exception { ApplicationContext appContext = new ClassPathXmlApplicationContext( "httpInOutApplicationContext.xml"); CamelContext camelContext = SpringCamelContext.springCamelContext( appContext, false); try { ProducerTemplate template = camelContext.createProducerTemplate(); camelContext.start(); InputStream is = (InputStream) template.sendBody("direct:start", ExchangePattern.InOut, "Camel examples"); System.out.println("Body: " + toString(is, 1).substring(0, 100)); } finally { camelContext.stop(); } } private static String toString(InputStream input, int noOfLines) throws IOException { BufferedReader reader = IOHelper.buffered(new InputStreamReader(input)); CollectionStringBuffer builder = new CollectionStringBuffer(); int i = 1; while (true) { String line = reader.readLine(); if (line == null) { return builder.toString(); } builder.append(line); if (i == noOfLines) { break; } i++; } return builder.toString(); } }
Output:
15:03| INFO | DefaultCamelContext.java 3164 | Route: route1 started and consuming from: Endpoint[direct://start] 15:03| INFO | DefaultCamelContext.java 2453 | Total 1 routes, of which 1 is started. 15:03| INFO | DefaultCamelContext.java 2454 | Apache Camel 2.15.1 (CamelContext: camel) started in 0.247 seconds 15:03| INFO | DefaultCamelContext.java 2418 | Apache Camel 2.15.1 (CamelContext: camel) is starting 15:03| INFO | DefaultCamelContext.java 2453 | Total 1 routes, of which 1 is started. 15:03| INFO | DefaultCamelContext.java 2454 | Apache Camel 2.15.1 (CamelContext: camel) started in 0.000 seconds 15:03| INFO | MarkerIgnoringBase.java 95 | ID-INMAA1-L1005-65150-1430991219547-0-2 >>> (route1) from(direct://start) --> setHeader[CamelHttpQuery, s=camel] <<>> (route1) setHeader[CamelHttpQuery, s=camel] --> http://www.javacodegeeks.com <<>> (route1) http://www.javacodegeeks.com --> ref:httpProcessor <<< Pattern:InOut, Headers:{breadcrumbId=ID-INMAA1-L1005-65150-1430991219547-0-1, Cache-Control=max-age=3600, CamelHttpQuery=s=camel, CamelHttpResponseCode=200, Content-Type=text/html; charset=UTF-8, Date=Thu, 07 May 2015 09:33:40 GMT, Expires=Thu, 07 May 2015 10:33:40 GMT, Server=Apache, Set-Cookie=[_icl_current_language=en; expires=Fri, 08-May-2015 09:33:40 GMT; path=/; domain=www.javacodegeeks.com, wpjb_transient_id=1430991220-3489; expires=Fri, 08-May-2015 09:33:40 GMT; path=/; domain=www.javacodegeeks.com], Transfer-Encoding=chunked, Vary=User-Agent,Accept-Encoding, X-Pingback=http://www.javacodegeeks.com/xmlrpc.php, X-Powered-By=W3 Total Cache/0.9.4.1}, BodyType:org.apache.camel.converter.stream.CachedOutputStream.WrappedInputStream, Body:[Body is instance of java.io.InputStream] MIP of Http Endpoint is InOut Body: <link rel="stylesheet" type=" 15:03| INFO | DefaultCamelContext.java 2660 | Apache Camel 2.15.1 (CamelContext: camel) is shutting down 15:03| INFO | DefaultShutdownStrategy.java 184 | Starting to graceful shutdown 1 routes (timeout 300 seconds) 15:03| INFO | DefaultShutdownStrategy.java 647 | Route: route1 shutdown complete, was consuming from: Endpoint[direct://start]
4. Change Route to InOnly Message Example
When processing a message in a Request-Response (InOut) route, you sometimes need to send the message to an endpoint, but do not want to receive a response. When the message reaches the stage in the route that sends to the InOnly endpoint direct:oneWay
, the MEP associated with the exchange is temporarily changed to InOnly. Once the InOnly route completes, the MIP is restored back to InOut for the rest of the parent route.
CamelChangeRouteInOnlyExample:
package com.javacodegeeks.camel; import org.apache.camel.CamelContext; import org.apache.camel.ExchangePattern; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.jndi.JndiContext; public class CamelChangeRouteInOnlyExample { public static void main(String[] args) throws Exception { JndiContext jndiContext = new JndiContext(); jndiContext.bind("myBean1", new MyBean1()); jndiContext.bind("myBean2", new MyBean2()); CamelContext camelContext = new DefaultCamelContext(jndiContext); try { camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .to("bean:myBean1") .inOnly("direct:oneWay") .to("bean:myBean1") .transform().constant("Done");; from("direct:oneWay") .to("bean:myBean2"); } }); camelContext.start(); ProducerTemplate template = camelContext.createProducerTemplate(); Object response = template.sendBody("direct:start", ExchangePattern.InOut, "X"); System.out.println("Response: " + response); } finally { camelContext.stop(); } } }
MyBean1:
package com.javacodegeeks.camel; import org.apache.camel.Exchange; public class MyBean1 { public String doSomething(Exchange exchange) { System.out.println("Bean1 Received Exchange: " + exchange.getIn().getBody(String.class) + ", MIP: " + exchange.getPattern()); return exchange.getIn().getBody(String.class); } }
MyBean2:
package com.javacodegeeks.camel; import org.apache.camel.Exchange; public class MyBean2 { public void doSomething(Exchange exchange) { System.out.println("Bean Received Exchange: " + exchange.getIn().getBody(String.class) + ", MIP: " + exchange.getPattern()); } }
Output:
Bean1 Received Exchange: X, MIP: InOut Bean Received Exchange: X, MIP: InOnly Bean1 Received Exchange: X, MIP: InOut Response: Done
5. Change Route to InOut Message Example
This example builds on the activemq example which is of InOnly Message Exchange Pattern (MEP). The consumer that fed a message into the route expects no response. We alter the MEP temporarily to InOut in order to request a response from an endpoint used in a one-way route. We use the below element to convert the route to an InOut MIP.
<inOut uri="direct:inOutWay" />
changeRouteToInOutApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false" /> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory" ref="connectionFactory" /> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="activemq:queue:inboundQueue" /> <to uri="bean:myBean1" /> <inOut uri="direct:inOutWay" /> <to uri="bean:myBean1" /> <to uri="stream:out" /> </route> <route> <from uri="direct:inOutWay" /> <to uri="bean:myBean1" /> </route> </camelContext> <bean id="myBean1" class="com.javacodegeeks.camel.MyBean1" /> </beans>
CamelChangeRouteInOutExampleUsingSpring:
package com.javacodegeeks.camel; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.spring.SpringCamelContext; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class CamelChangeRouteInOutExampleUsingSpring { public static final void main(String[] args) throws Exception { ApplicationContext appContext = new ClassPathXmlApplicationContext( "changeRouteToInOutApplicationContext.xml"); CamelContext camelContext = SpringCamelContext.springCamelContext( appContext, false); try { ProducerTemplate template = camelContext.createProducerTemplate(); camelContext.start(); template.sendBody("activemq:queue:inboundQueue", "InOut example"); Thread.sleep(1000); } finally { camelContext.stop(); } } }
Output:
Bean1 Received Exchange: InOut example, MIP: InOnly Bean1 Received Exchange: InOut example, MIP: InOut Bean1 Received Exchange: InOut example, MIP: InOnly InOut example
6. Custom Processor
In this example, we create a custom processor which retrieves the inbound message from the exchange, does some processing based on the message and then sets an output to the same exchange.
Example is about a course scheduler which assigns a trainer to a course and then schedules it. If the trainer is not found, it will throw TrainerNotAvailableException
.
The exception thrown is handled using onException
API.
CamelProcessorExample:
package com.javacodegeeks.camel; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.jndi.JndiContext; public class CamelProcessorExample { public static void main(String[] args) throws Exception { JndiContext jndiContext = new JndiContext(); jndiContext.bind("scheduleCourse", new CourseScheduler()); CamelContext camelContext = new DefaultCamelContext(jndiContext); try { camelContext.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { from("direct:course_name") .onException(TrainerNotAvailableException.class) .handled(true) .transform(constant("No trainer available exception")) .to("stream:out") .end() .process(new TrainerAvailabilityChecker()) .to("bean:scheduleCourse?method=schedule") .to("stream:out"); } }); camelContext.start(); ProducerTemplate template = camelContext.createProducerTemplate(); template.sendBody("direct:course_name", "Scala"); try { template.sendBody("direct:course_name", "Spring Integration"); } catch (Throwable e) { System.out.println("Exception " + e.getMessage()); } } finally { camelContext.stop(); } } }
The custom processor which checks the trainer availability.
TrainerAvailabilityChecker:
package com.javacodegeeks.camel; import java.util.HashMap; import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.Processor; public class TrainerAvailabilityChecker implements Processor { public void process(Exchange exchange) throws Exception { String course = exchange.getIn().getBody(String.class); System.out.println("Check availability of trainer for " + course); String trainer = TRAINERS.get(course); if (trainer == null) { throw new TrainerNotAvailableException(exchange, "No trainer for " + course); } System.out.print("Found Trainer: " ); exchange.getOut().setBody(new CourseSchedule(trainer, course), CourseSchedule.class); } private static final Map TRAINERS = new HashMap(); static { TRAINERS.put("Scala", "Joe"); TRAINERS.put("Java Core", "Sam"); TRAINERS.put("Mockito", "Krish"); } }
TrainerNotAvailableException:
package com.javacodegeeks.camel; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; public class TrainerNotAvailableException extends CamelExchangeException { private static final long serialVersionUID = -4536714110976166452L; public TrainerNotAvailableException(Exchange exchange, String message) { super(message, exchange); } public TrainerNotAvailableException(String message, Exchange exchange, Throwable cause) { super(message, exchange, cause); } }
Once we know the trainer is available, we schedule the training.
CourseScheduler:
package com.javacodegeeks.camel; import java.util.Date; public class CourseScheduler { public CourseSchedule schedule(CourseSchedule courseSchedule) { String course = courseSchedule.getCourse(); String trainer = courseSchedule.getTrainer(); System.out.println("Schedule " + course + ", trainer is " + trainer); courseSchedule.setTrainingDate(new Date()); return courseSchedule; } }
CourseSchedule:
package com.javacodegeeks.camel; import java.util.Date; public class CourseSchedule { private String trainer; private String course; private Date trainingDate; public CourseSchedule(String trainer, String course) { super(); this.trainer = trainer; this.course = course; } public Date getTrainingDate() { return trainingDate; } public void setTrainingDate(Date trainingDate) { this.trainingDate = trainingDate; } public String getTrainer() { return trainer; } public String getCourse() { return course; } public String toString() { return "Course " + course + " will start on " + trainingDate + ", trainer is " + trainer; } }
Output:
Check availability of trainer for Scala Found Trainer: Schedule Scala, trainer is Joe Course Scala will start on Fri May 08 14:12:18 IST 2015, trainer is Joe Check availability of trainer for Spring Integration No trainer available exception
7. Download the Eclipse Project
This was an example about Camel Exchange.
You can download the full source code of this example here: camelExchangeExample.zip
Hi,
I am download the source code and i implement the InOut message using apache cmael.But its not working .its giving connection reset.please guide me how to fix that issue.
Regards,
Kanna