Apache Camel

Apache Camel Exchange Example

Camel Exchange represents an abstraction for an exchange of messages which involves a request message and its corresponding reply or an exception message. It consists of the below components:

  1. Exchange ID – A unique ID that identifies the exchange.
  2. MEP – A pattern that denotes whether you’re using the InOnly or InOut messaging.
  3. InOnly message – Contains only a one-way message (also known as an Event message). For example, the JMS messaging.
  4. InOut message – It represents a request-response message. In this scenario, you not only have a request message, you also have an out message containing the reply of the request sent. For example, HTTP-based transports.
  5. Exception – If an error occurs at any time during routing, an Exception will be set in the exception field
  6. Properties – Similar to message headers, but they last for the duration of the entire exchange.

We will see an example of each component, lets now come to the setup part.
This example uses the following frameworks:

  1. Maven 3.2.3
  2. Apache Camel 2.15.1
  3. Spring 4.1.5.RELEASE
  4. Eclipse  as the IDE, version Luna 4.4.1.

Before we start with the examples, lets first add our dependencies to pom.xml.

1. Dependencies

Add the following dependencies to pom.xml:

  1. camel-core – basic module of apache camel.
  2. camel-stream – We will use this to send output to the console.
  3. camel-jms and activemq-camel – ActiveMQ JMS components. We will use this to show OnWay request example
  4. spring-context and camel-spring – Since we be configuring our camel context in spring.
  5. camel-http – Camel http component. We will use this to show a request-response example.

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.javacodegeeks.camel</groupId>
	<artifactId>camelHelloWorld</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-core</artifactId>
			<version>2.15.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-stream</artifactId>
			<version>2.15.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-jms</artifactId>
			<version>2.15.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-camel</artifactId>
			<version>5.6.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>4.1.5.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-spring</artifactId>
			<version>2.15.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.camel</groupId>
			<artifactId>camel-http</artifactId>
			<version>2.15.1</version>
		</dependency>		
	</dependencies>
</project>

2. InOnly Message Example

In this sample, we will see an example of InOnly message.

We want to listen for messages on a queue, process the messages using a POJO and print the returned message in console. The message is an InOnly message, we print the Exchange details in MyBean1.doSomething.

activemqInOnlyApplicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       ">
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost?broker.persistent=false" />
	</bean>
	<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<camelContext xmlns="http://camel.apache.org/schema/spring">
		<route>
			<from uri="activemq:queue:inboundQueue" />
			<to uri="bean:myBean1"/>
			<to uri="stream:out" />
		</route>		
	</camelContext>
	<bean id="myBean1" class="com.javacodegeeks.camel.MyBean1"/>

</beans>

In here, we print the exchange details like the received message and the exchange pattern.

MyBean1:

package com.javacodegeeks.camel;

import org.apache.camel.Exchange;

public class MyBean1 {
	public String doSomething(Exchange exchange) {
		System.out.println("Bean1 Received Exchange: " + exchange.getIn().getBody(String.class) + ", MIP: " + exchange.getPattern());
		return exchange.getIn().getBody(String.class);
	}
}

CamelActivemqInExampleUsingSpring is the main class to initiate the routing.

CamelActivemqInExampleUsingSpring:

package com.javacodegeeks.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spring.SpringCamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class CamelActivemqInExampleUsingSpring {
	public static final void main(String[] args) throws Exception {
		ApplicationContext appContext = new ClassPathXmlApplicationContext(
				"activemqInOnlyApplicationContext.xml");
		CamelContext camelContext = SpringCamelContext.springCamelContext(
				appContext, false);
		try {
			ProducerTemplate template = camelContext.createProducerTemplate();
			camelContext.start();
			template.sendBody("activemq:queue:inboundQueue", "InOnly example");
			Thread.sleep(1000);
		} finally {
			camelContext.stop();
		}
	}	
}

Output:

Bean1 Received Exchange: InOnly example, MIP: InOnly
InOnly example

3. InOut Message Example

Let’ see an example of InOut message involving http request and response. We will want to query ‘camel’ in site http://www.javacodegeeks.com. The returned response will be an html text of which we will print the first 100 characters. We will set the header CamelHttpQuery to camel.

httpInOutApplicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

	<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"
		trace="true">
		<route>
			<from uri="direct:start" />
			<setHeader headerName="CamelHttpQuery">
				<constant>s=camel</constant>
			</setHeader>
			<to uri="http://www.javacodegeeks.com" />
			<process ref="httpProcessor"/>
		</route>
	</camelContext>

	<bean id="httpProcessor" class="com.javacodegeeks.camel.HttpProcessor"/>
</beans>

We will print the exchange pattern in HttpProcessor. It will be of InOut type.

HttpProcessor:

package com.javacodegeeks.camel;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class HttpProcessor implements Processor {

	public void process(Exchange exchange) throws Exception {
		System.out.println("MIP of Http Endpoint is " + exchange.getPattern());
	}

}

CamelHttpInOutExampleUsingSpring:

package com.javacodegeeks.camel;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import org.apache.camel.CamelContext;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.camel.util.CollectionStringBuffer;
import org.apache.camel.util.IOHelper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class CamelHttpInOutExampleUsingSpring {
	public static final void main(String[] args) throws Exception {
		ApplicationContext appContext = new ClassPathXmlApplicationContext(
				"httpInOutApplicationContext.xml");
		CamelContext camelContext = SpringCamelContext.springCamelContext(
				appContext, false);
		try {
			ProducerTemplate template = camelContext.createProducerTemplate();
			camelContext.start();
			InputStream is = (InputStream) template.sendBody("direct:start", ExchangePattern.InOut, "Camel examples");
			System.out.println("Body: " + toString(is, 1).substring(0, 100));
		} finally {
			camelContext.stop();
		}
	}
	
    private static String toString(InputStream input, int noOfLines) throws IOException {
        BufferedReader reader = IOHelper.buffered(new InputStreamReader(input));
        CollectionStringBuffer builder = new CollectionStringBuffer();
        int i = 1;
        while (true) {        	
            String line = reader.readLine();
            if (line == null) {
                return builder.toString();
            }
            builder.append(line);
            if (i == noOfLines) {
            	break;
            }
            i++;
        }
        return builder.toString();
    }
}

Output:

15:03| INFO | DefaultCamelContext.java 3164 | Route: route1 started and consuming from: Endpoint[direct://start]
15:03| INFO | DefaultCamelContext.java 2453 | Total 1 routes, of which 1 is started.
15:03| INFO | DefaultCamelContext.java 2454 | Apache Camel 2.15.1 (CamelContext: camel) started in 0.247 seconds
15:03| INFO | DefaultCamelContext.java 2418 | Apache Camel 2.15.1 (CamelContext: camel) is starting
15:03| INFO | DefaultCamelContext.java 2453 | Total 1 routes, of which 1 is started.
15:03| INFO | DefaultCamelContext.java 2454 | Apache Camel 2.15.1 (CamelContext: camel) started in 0.000 seconds
15:03| INFO | MarkerIgnoringBase.java 95 | ID-INMAA1-L1005-65150-1430991219547-0-2 >>> (route1) from(direct://start) --> setHeader[CamelHttpQuery, s=camel] <<>> (route1) setHeader[CamelHttpQuery, s=camel] --> http://www.javacodegeeks.com <<>> (route1) http://www.javacodegeeks.com --> ref:httpProcessor <<< Pattern:InOut, Headers:{breadcrumbId=ID-INMAA1-L1005-65150-1430991219547-0-1, Cache-Control=max-age=3600, CamelHttpQuery=s=camel, CamelHttpResponseCode=200, Content-Type=text/html; charset=UTF-8, Date=Thu, 07 May 2015 09:33:40 GMT, Expires=Thu, 07 May 2015 10:33:40 GMT, Server=Apache, Set-Cookie=[_icl_current_language=en; expires=Fri, 08-May-2015 09:33:40 GMT; path=/; domain=www.javacodegeeks.com, wpjb_transient_id=1430991220-3489; expires=Fri, 08-May-2015 09:33:40 GMT; path=/; domain=www.javacodegeeks.com], Transfer-Encoding=chunked, Vary=User-Agent,Accept-Encoding, X-Pingback=http://www.javacodegeeks.com/xmlrpc.php, X-Powered-By=W3 Total Cache/0.9.4.1}, BodyType:org.apache.camel.converter.stream.CachedOutputStream.WrappedInputStream, Body:[Body is instance of java.io.InputStream]
MIP of Http Endpoint is InOut
Body: <link rel="stylesheet" type="
15:03| INFO | DefaultCamelContext.java 2660 | Apache Camel 2.15.1 (CamelContext: camel) is shutting down
15:03| INFO | DefaultShutdownStrategy.java 184 | Starting to graceful shutdown 1 routes (timeout 300 seconds)
15:03| INFO | DefaultShutdownStrategy.java 647 | Route: route1 shutdown complete, was consuming from: Endpoint[direct://start]

4. Change Route to InOnly Message Example

When processing a message in a Request-Response (InOut) route, you sometimes need to send the message to an endpoint, but do not want to receive a response. When the message reaches the stage in the route that sends to the InOnly endpoint direct:oneWay, the MEP associated with the exchange is temporarily changed to InOnly. Once the InOnly route completes, the MIP is restored back to InOut for the rest of the parent route.

CamelChangeRouteInOnlyExample:

package com.javacodegeeks.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.jndi.JndiContext;

public class CamelChangeRouteInOnlyExample {
	public static void main(String[] args) throws Exception {
		JndiContext jndiContext = new JndiContext();
		jndiContext.bind("myBean1", new MyBean1());
		jndiContext.bind("myBean2", new MyBean2());
		CamelContext camelContext = new DefaultCamelContext(jndiContext);
		try {
			camelContext.addRoutes(new RouteBuilder() {
				@Override
				public void configure() throws Exception {
					from("direct:start")
					.to("bean:myBean1")
					.inOnly("direct:oneWay")
					.to("bean:myBean1")
					.transform().constant("Done");;
					
					from("direct:oneWay")
					.to("bean:myBean2");
				}
			});
			camelContext.start();	
			ProducerTemplate template = camelContext.createProducerTemplate();
			Object response = template.sendBody("direct:start", ExchangePattern.InOut, "X");
			System.out.println("Response: " + response);
		} finally {
			camelContext.stop();
		}
	}
}

MyBean1:

package com.javacodegeeks.camel;

import org.apache.camel.Exchange;

public class MyBean1 {
	public String doSomething(Exchange exchange) {
		System.out.println("Bean1 Received Exchange: " + exchange.getIn().getBody(String.class) + ", MIP: " + exchange.getPattern());
		return exchange.getIn().getBody(String.class);
	}
}

MyBean2:

package com.javacodegeeks.camel;

import org.apache.camel.Exchange;

public class MyBean2 {
	public void doSomething(Exchange exchange) {
		System.out.println("Bean Received Exchange: " + exchange.getIn().getBody(String.class) + ", MIP: " + exchange.getPattern());
	}
}

Output:

Bean1 Received Exchange: X, MIP: InOut
Bean Received Exchange: X, MIP: InOnly
Bean1 Received Exchange: X, MIP: InOut
Response: Done

5. Change Route to InOut Message Example

This example builds on the activemq example which is of InOnly Message Exchange Pattern (MEP). The consumer that fed a message into the route expects no response.  We alter the MEP temporarily to InOut in order to request a response from an endpoint used in a one-way route. We use the below element to convert the route to an InOut MIP.

<inOut uri="direct:inOutWay" />

changeRouteToInOutApplicationContext.xml:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
       ">
	<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="vm://localhost?broker.persistent=false" />
	</bean>
	<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="connectionFactory" ref="connectionFactory" />
	</bean>
	<camelContext xmlns="http://camel.apache.org/schema/spring">
		<route>
			<from uri="activemq:queue:inboundQueue" />
			<to uri="bean:myBean1" />
			<inOut uri="direct:inOutWay" />
			<to uri="bean:myBean1" />
			<to uri="stream:out" />
		</route>
		<route>
			<from uri="direct:inOutWay" />
			<to uri="bean:myBean1" />
		</route>
	</camelContext>
	<bean id="myBean1" class="com.javacodegeeks.camel.MyBean1" />

</beans>

CamelChangeRouteInOutExampleUsingSpring:

package com.javacodegeeks.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spring.SpringCamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class CamelChangeRouteInOutExampleUsingSpring {
	public static final void main(String[] args) throws Exception {
		ApplicationContext appContext = new ClassPathXmlApplicationContext(
				"changeRouteToInOutApplicationContext.xml");
		CamelContext camelContext = SpringCamelContext.springCamelContext(
				appContext, false);
		try {
			ProducerTemplate template = camelContext.createProducerTemplate();
			camelContext.start();
			template.sendBody("activemq:queue:inboundQueue", "InOut example");
			Thread.sleep(1000);
		} finally {
			camelContext.stop();
		}
	}	
}

Output:

Bean1 Received Exchange: InOut example, MIP: InOnly
Bean1 Received Exchange: InOut example, MIP: InOut
Bean1 Received Exchange: InOut example, MIP: InOnly
InOut example

6. Custom Processor

In this example, we create a custom processor which retrieves the inbound message from the exchange, does some processing based on the message and then sets an output to the same exchange.
Example is about a course scheduler which assigns a trainer to a course and then schedules it. If the trainer is not found, it will throw TrainerNotAvailableException.

The exception thrown is handled using onException API.

CamelProcessorExample:

package com.javacodegeeks.camel;

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.jndi.JndiContext;

public class CamelProcessorExample {
	public static void main(String[] args) throws Exception {
		JndiContext jndiContext = new JndiContext();
		jndiContext.bind("scheduleCourse", new CourseScheduler());
		CamelContext camelContext = new DefaultCamelContext(jndiContext);
		try {
			camelContext.addRoutes(new RouteBuilder() {
				@Override
				public void configure() throws Exception {										
					from("direct:course_name")
					   .onException(TrainerNotAvailableException.class)
					   .handled(true)
					   .transform(constant("No trainer available exception"))
					   .to("stream:out")
					   .end()
					.process(new TrainerAvailabilityChecker())					
					.to("bean:scheduleCourse?method=schedule")
					.to("stream:out");
				}
			});
			camelContext.start();	
			ProducerTemplate template = camelContext.createProducerTemplate();
			template.sendBody("direct:course_name", "Scala");
			try {
				template.sendBody("direct:course_name", "Spring Integration");
			} catch (Throwable e) {
				System.out.println("Exception " + e.getMessage());
			}
		} finally {
			camelContext.stop();
		}
	}
}

The custom processor which checks the trainer availability.

TrainerAvailabilityChecker:

package com.javacodegeeks.camel;

import java.util.HashMap;
import java.util.Map;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

public class TrainerAvailabilityChecker implements Processor {
	public void process(Exchange exchange) throws Exception {		
		String course = exchange.getIn().getBody(String.class);
		System.out.println("Check availability of trainer for " + course);
		String trainer = TRAINERS.get(course);
		if (trainer == null) {
			throw new TrainerNotAvailableException(exchange, "No trainer for " + course);
		}
		System.out.print("Found Trainer: " );
		exchange.getOut().setBody(new CourseSchedule(trainer, course), CourseSchedule.class);
	}
	
	private static final Map TRAINERS = new HashMap(); 
	
	static {
		TRAINERS.put("Scala", "Joe");
		TRAINERS.put("Java Core", "Sam");
		TRAINERS.put("Mockito", "Krish");
	}
}

TrainerNotAvailableException:

package com.javacodegeeks.camel;

import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;

public class TrainerNotAvailableException extends CamelExchangeException {

	private static final long serialVersionUID = -4536714110976166452L;

	public TrainerNotAvailableException(Exchange exchange, String message) {
        super(message, exchange);
    }

    public TrainerNotAvailableException(String message, Exchange exchange, Throwable cause) {
        super(message, exchange, cause);
    }
}

Once we know the trainer is available, we schedule the training.

CourseScheduler:

package com.javacodegeeks.camel;

import java.util.Date;

public class CourseScheduler {
	public CourseSchedule schedule(CourseSchedule courseSchedule) {
		String course = courseSchedule.getCourse();
		String trainer = courseSchedule.getTrainer();
		System.out.println("Schedule " + course + ", trainer is " + trainer);
		courseSchedule.setTrainingDate(new Date());
		return courseSchedule; 
	}
}

CourseSchedule:

package com.javacodegeeks.camel;

import java.util.Date;

public class CourseSchedule {
	private String trainer;
	private String course;
	private Date trainingDate;
	public CourseSchedule(String trainer, String course) {
		super();
		this.trainer = trainer;
		this.course = course;
	}
	public Date getTrainingDate() {
		return trainingDate;
	}
	public void setTrainingDate(Date trainingDate) {
		this.trainingDate = trainingDate;
	}
	public String getTrainer() {
		return trainer;
	}
	public String getCourse() {
		return course;
	}
	public String toString() {
		return "Course " + course + " will start on " + trainingDate + ", trainer is " + trainer;
	}
}

Output:

Check availability of trainer for Scala
Found Trainer: Schedule Scala, trainer is Joe
Course Scala will start on Fri May 08 14:12:18 IST 2015, trainer is Joe
Check availability of trainer for Spring Integration
No trainer available exception

7. Download the Eclipse Project

This was an example about Camel Exchange.

Download
You can download the full source code of this example here: camelExchangeExample.zip

Ram Mokkapaty

Ram holds a master's degree in Machine Design from IT B.H.U. His expertise lies in test driven development and re-factoring. He is passionate about open source technologies and actively blogs on various java and open-source technologies like spring. He works as a principal Engineer in the logistics domain.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
kanna
kanna
5 years ago

Hi,

I am download the source code and i implement the InOut message using apache cmael.But its not working .its giving connection reset.please guide me how to fix that issue.

Regards,
Kanna

Back to top button