RSS

Tag Archives: activemq

Configuring a Network of Brokers in Fuse Fabric

In ActiveMQ it is possible to define logical groups of message brokers in order to obtain more resiliency or throughput.
The setup configuration described here can be outlined as follows:
Network_of_brokers_layout
Creating broker groups and a network of brokers can be done in various manners in JBoss Fuse Fabric. Here we are going to use the Fabric CLI.
The following steps are necessary to create the configuration above:

  1. Creating a Fabric (if we don’t already have one)

  2. Create child containers

  3. Create the MQ broker profiles and assign them to the child containers

  4. Connect a couple of clients for testing

1. Creating a Fabric (optional)

Assuming we start with a clean Fuse installation the first step is creating a Fabric. This step can be skipped if a fabric is already available.

In the Fuse CLI execute the following command:

JBossFuse:karaf@root> fabric:create -p fabric --wait-for-provisioning

2. Create child containers

Next we are going to create two sets of child containers which are going to host our brokers. Note, the child containers we are going to create in this step are empty child containers and do not yet contain AMQ brokers. We are going to provision these containers with AMQ brokers in step 3.

First create the child containers for siteA:

JBossFuse:karaf@root> fabric:container-create-child root site-a 2

Next create the child containers for siteB:

JBossFuse:karaf@root> fabric:container-create-child root site-b 2

3. Create the MQ broker profiles and assign them to the child containers

In this step we are going to create the broker profiles in fabric and assign them to the containers we created in the previous step.


JBossFuse:karaf@root> fabric:mq-create --group site-a --networks site-b --networks-username admin --networks-password admin --assign-container site-a1,site-a2 site-a-profile

JBossFuse:karaf@root> fabric:mq-create --group site-b --networks site-a --networks-username admin --networks-password admin --assign-container site-b1,site-b2 site-b-profile

The fabric:mq-create command creates a broker profile in Fuse Fabric. The –group flag assigns a group to the brokers in the profile. The networks flag creates the required network connection needed for a network of brokers. In the assign-container flag we assign this newly created broker profile to one or more containers.

4.Connect a couple of clients for testing

A sample project containing two clients, one producer and one consumer is available on github.

Clone the repository:

$ git clone https://github.com/pimg/mq-fabric-client.git

build the project:

$ mvn clean install

Start the message consumer (in the java-consumer directory):

$ mvn exec:java

Start the message producer (in the java-producer directory):

$ mvn exec:java

Observe the console logging of the producer:

14:48:58 INFO  Using local ZKClient
14:48:58 INFO  Starting
14:48:58 INFO  Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
14:48:58 INFO  Client environment:host.name=pim-XPS-15-9530
14:48:58 INFO  Client environment:java.version=1.8.0_77
14:48:58 INFO  Client environment:java.vendor=Oracle Corporation
14:48:58 INFO  Client environment:java.home=/home/pim/apps/jdk1.8.0_77/jre
14:48:58 INFO  Client environment:java.class.path=/home/pim/apps/apache-maven-3.3.9/boot/plexus-classworlds-2.5.2.jar
14:48:58 INFO  Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
14:48:58 INFO  Client environment:java.io.tmpdir=/tmp
14:48:58 INFO  Client environment:java.compiler=<NA>
14:48:58 INFO  Client environment:os.name=Linux
14:48:58 INFO  Client environment:os.arch=amd64
14:48:58 INFO  Client environment:os.version=4.2.0-34-generic
14:48:58 INFO  Client environment:user.name=pim
14:48:58 INFO  Client environment:user.home=/home/pim
14:48:58 INFO  Client environment:user.dir=/home/pim/workspace/mq-fabric/java-producer
14:48:58 INFO  Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@47e011e3
14:48:58 INFO  Opening socket connection to server localhost/127.0.0.1:2181
14:48:58 INFO  Socket connection established to localhost/127.0.0.1:2181, initiating session
14:48:58 INFO  Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x154620540e80009, negotiated timeout = 40000
14:48:58 INFO  State change: CONNECTED
14:48:59 INFO  Adding new broker connection URL: tcp://10.0.3.1:38417
14:49:00 INFO  Successfully connected to tcp://10.0.3.1:38417
14:49:00 INFO  Sending to destination: queue://fabric.simple this text: 1. message sent
14:49:00 INFO  Sending to destination: queue://fabric.simple this text: 2. message sent
14:49:01 INFO  Sending to destination: queue://fabric.simple this text: 3. message sent
14:49:01 INFO  Sending to destination: queue://fabric.simple this text: 4. message sent
14:49:02 INFO  Sending to destination: queue://fabric.simple this text: 5. message sent
14:49:02 INFO  Sending to destination: queue://fabric.simple this text: 6. message sent
14:49:03 INFO  Sending to destination: queue://fabric.simple this text: 7. message sent
14:49:03 INFO  Sending to destination: queue://fabric.simple this text: 8. message sent
14:49:04 INFO  Sending to destination: queue://fabric.simple this text: 9. message sent

Observe the console logging of the consumer:

14:48:20 INFO  Using local ZKClient
14:48:20 INFO  Starting
14:48:20 INFO  Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
14:48:20 INFO  Client environment:host.name=pim-XPS-15-9530
14:48:20 INFO  Client environment:java.version=1.8.0_77
14:48:20 INFO  Client environment:java.vendor=Oracle Corporation
14:48:20 INFO  Client environment:java.home=/home/pim/apps/jdk1.8.0_77/jre
14:48:20 INFO  Client environment:java.class.path=/home/pim/apps/apache-maven-3.3.9/boot/plexus-classworlds-2.5.2.jar
14:48:20 INFO  Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
14:48:20 INFO  Client environment:java.io.tmpdir=/tmp
14:48:20 INFO  Client environment:java.compiler=<NA>
14:48:20 INFO  Client environment:os.name=Linux
14:48:20 INFO  Client environment:os.arch=amd64
14:48:20 INFO  Client environment:os.version=4.2.0-34-generic
14:48:20 INFO  Client environment:user.name=pim
14:48:20 INFO  Client environment:user.home=/home/pim
14:48:20 INFO  Client environment:user.dir=/home/pim/workspace/mq-fabric/java-consumer
14:48:20 INFO  Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@3d732a14
14:48:20 INFO  Opening socket connection to server localhost/127.0.0.1:2181
14:48:20 INFO  Socket connection established to localhost/127.0.0.1:2181, initiating session
14:48:20 INFO  Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x154620540e80008, negotiated timeout = 40000
14:48:20 INFO  State change: CONNECTED
14:48:21 INFO  Adding new broker connection URL: tcp://10.0.3.1:38417
14:48:21 INFO  Successfully connected to tcp://10.0.3.1:38417
14:48:21 INFO  Start consuming messages from queue://fabric.simple with 120000ms timeout
14:49:00 INFO  Got 1. message: 1. message sent
14:49:00 INFO  Got 2. message: 2. message sent
14:49:01 INFO  Got 3. message: 3. message sent
14:49:01 INFO  Got 4. message: 4. message sent
14:49:02 INFO  Got 5. message: 5. message sent
14:49:02 INFO  Got 6. message: 6. message sent
14:49:03 INFO  Got 7. message: 7. message sent
14:49:03 INFO  Got 8. message: 8. message sent
14:49:04 INFO  Got 9. message: 9. message sent
Advertisements
 
Leave a comment

Posted by on 2016-12-02 in JBoss Fuse

 

Tags: , , , , ,

ActiveMQ DLQ use OriginalDestination in Camel

Ah the DLQ, the place where messages go to die. When messages end up in the DLQ (Dead letter queue) of ActiveMQ they receive an additional message header with the queue name where the message resided originally. This message header called Original Destination, can be viewed from Hawtio.

DLQSubscriber-1

When using Apache Camel as a subscriber I noticed something strange. Normally all JMS Headers and properties are mapped to the Camel Exchange headers and properties. However, the OriginalDestination header was not.

For example when creating this dummy route for testing purposes I got the following output:


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.camel.builder.RouteBuilder;

public class DLQSubscriberRouteBuilder extends RouteBuilder{

    @Override
    public void configure() throws Exception {
        from("amq:queue:ActiveMQ.DLQ").log("${headers}");
    }

}


 


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-13:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438348926673, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:1:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438348926663, JMSType=null, JMSXGroupID=null, JMSXUserID=null, originalExpiration=1438348926673}

I spent quite some time trying to figure out what happened when I realized the JMS message on the DLQ is not the exact same message from the original queue with some extra headers and properties. The original message seems to be wrapped inside the message placed on the DLQ. To access this message we need to work with the “raw” JMSMessage from ActiveMQ rather the processed message from Camel. This can be done by creating a processor which uses a typeconverter in Camel to get the JMS message from the Camel exchange.

In the code below I created a processor which fetches the inner message and from that message retrieves the originalDestination property (which is an instance variable on the ActiveMQMessage class (ActiveMQTextMessage is a subclass of ActiveMQMessage) and places this property as an header on the Camel exchange.


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsMessage;

public class DQLMessageProcessor implements Processor{

    @Override
    public void process(Exchange exchange) throws Exception {
        JmsMessage jmsMsg = exchange.getIn().getBody(JmsMessage.class);
        ActiveMQTextMessage innerMsg = (ActiveMQTextMessage) jmsMsg.getJmsMessage();
        exchange.getIn().setHeader("OriginalDestination", innerMsg.getOriginalDestination());
    }

}


Now when observing the output log of .log(“${headers}”) we see the OriginalDestination header in the log output:


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-21:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438350527570, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:4:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438350527560, JMSType=null, JMSXGroupID=null, JMSXUserID=null, OriginalDestination=queue://test2, originalExpiration=1438350527570}

In the end it costs me quite some time to grab the concept of fetching the ActiveMQTextMessage from the JmsMessage, since I was under the impression the Camel ActiveMQ component does this out of the box.

Anyway I hope it helps someone.

 

 
1 Comment

Posted by on 2015-07-31 in JBoss Fuse

 

Tags: , , , , ,

Fuse Fabric MQ provision exception

When using the discovery endpoint to connect to a master slave ActiveMQ cluster I ran into an error.
It is worth to note that this exception is not caused by the discovery mechanism, I happened to find this issue when using the discovery url, however when hard wiring the connector (e.g. using tcp://localhost:61616) will also cause this issue. This issue has to do with the container setup and profiling which I will explain below.

“Provision Exception:

org.osgi.service.resolver.ResolutionException: Unable to resolve dummy/0.0.0: missing requirement [dummy/0.0.0] osgi.identity; osgi.identity=fabricdemo; type=osgi.bundle; version=”[1.0.0.SNAPSHOT,1.0.0.SNAPSHOT]” [caused by: Unable to resolve fabricdemo/1.0.0.SNAPSHOT: missing requirement [fabricdemo/1.0.0.SNAPSHOT] osgi.wiring.package; filter:=”(&(osgi.wiring.package=org.apache.activemq.camel.component)(version>=5.9.0)(!(version>=6.0.0)))”]”

Fabric-MQ-provision-error-1

This is caused by the Fuse container setup, where the brokers run on two seperate containers in a master slave construction. The container running the deployed Camel route is also running the JBoss Fuse minimal profile.

Fabric-MQ-provision-error-2

The connection to the broker cluster is setup in the Blueprint context in the following manner:

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
	<property name="brokerURL" value="discovery:(fabric:default)" />
	<property name="userName" value="admin" />
	<property name="password" value="admin" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
	<property name="maxConnections" value="1" />
	<property name="maximumActiveSessionPerConnection" value="500" />
	<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
	<property name="connectionFactory" ref="pooledConnectionFactory" />
	<property name="concurrentConsumers" value="10" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
	<property name="configuration" ref="jmsConfig"/>
</bean>

When starting the container a provision exception is thrown, this exception: missing requirement [fabricdemo/1.0.0.SNAPSHOT] osgi.wiring.package; filter:=”(&(osgi.wiring.package=org.apache.activemq.camel.component)(version>=5.9.0)(!(version>=6.0.0)))”]”

Again, it is not just caused by the discovery brokerURL: .

The exception is thrown because the JBoss Fuse minimal profile does not provides the ActiveMQ component. When adding the dependencies for this component to your project the exception still persists, the reason for this is that the component is not exported by default in the OSGi bundle. This means other bundles cannot use it. Exporting the class implementing the component will solve this issue.
To export the ActiveMQ component class we need to extend the Apache Felix maven-bundle-plugin. We need to tell the plugin to export the ActiveMQ component, this can be done by adding the following line to the configuration:

<Export-Package>org.apache.activemq.camel.component</Export-Package>

The entire plugin now looks like this:

<plugin>
	<groupId>org.apache.felix</groupId>
	<artifactId>maven-bundle-plugin</artifactId>
	<version>2.3.7</version>
	<extensions>true</extensions>
	<configuration>
		<instructions>
			<Bundle-SymbolicName>fabricdemo</Bundle-SymbolicName>
			<Private-Package>nl.rubix.camel-activemq.*</Private-Package>
			<Import-Package>*</Import-Package>
			<Export-Package>org.apache.activemq.camel.component</Export-Package>
		</instructions>
	</configuration>
</plugin>

As a side note: to use the discovey broker url the mq-fabric feature must be added to the profile. For this demo I used the Fabric maven plugin which I configured as follows:

<plugin>
	<groupId>io.fabric8</groupId>
	<artifactId>fabric8-maven-plugin</artifactId>
	<configuration>
		<profile>activemqtest</profile>
		<features>mq-fabric</features>
	</configuration>
</plugin>

After the modification the the pom.xml the container will start properly:

Fabric-MQ-provision-error-3

Anyway I hope this helps someone.

 
1 Comment

Posted by on 2014-12-24 in JBoss Fuse

 

Tags: , , , , , ,