RSS

Tag Archives: activemq

Authenticating a JMS consumer with 3Scale, Camel and ActiveMQ

3Scale is an API Management platform used for authenticating an throttleing API calls among many, many other things. Now when thinking of API’s most people think of RESTfull API’s these days. And altough 3Scale primarily targets RESTfull API’s it is also possible to use other types of API’s as this blog will demonstrate. In this post we will use a Camel JMS subscriber in combination with ActiveMQ and authenticate requests against the 3Scale API Management platform.

First let’s look at the 3Scale setup.

The first step is to create a new service, however normally one would select one of the APICast API Gateway options for handling the API traffic. This time however we are selecting the Java plugin option, since Camel is based on Java. Obviously the same principles could be applied in one of the other programming languages for which plugins are available.   

The next step is to go to the integration page. But, where normally we would configure the mapping rules of our RESTfull API, we now get instructions to implement the Java plugin.

 

It is good to note the rest of the 3Scale setup is completely default. De default Hits metric is used as shown below, although custom methods could easily be defined.

 

For this example one application plan with a rate limit has been configured.

Integrating the 3Scale Java plugin with Apache Camel

Apache Camel has numerous ways of integrating custom code and creating customizations. For this example a custom processor is created, although a bean, or component would work also.

The first step is to import the 3Scale java plugin dependency via Maven, by adding the following to the pom.xml file:

 

<dependency>
    <groupId>net.3scale</groupId>
    <artifactId>3scale-api</artifactId>
    <version>3.0.4</version>
</dependency>

Now we can integrate the 3Scale Java plugin in our Camel processor, which is going to retrieve the 3Scale appId and appKey, used for authentication from JMS headers. With the appId and appKey the 3Scale API is called for authentication. However this is not the only thing we need to pass in our request towards 3Scale. To authenticate against 3Scale selecting the correct 3Scale account and service we need to pass the ServiceId of the service we created above and pass the accompanying service token. Since these are fixed per environment we retrieve these values from a properties file. Finally we need to increment the hits metric. Once all these parameters are passed in the request we can invoke 3Scale and authenticate our request. If we are authenticated and authorized for this API we finish the processor, following the Camel Route execution. However, when we are not authenticated we are going to stop the route and any further processing.
The entire processor looks like this:

package nl.rubix.eos.api.camelthreescale.processor;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.deltaspike.core.api.config.ConfigProperty;
import threescale.v3.api.AuthorizeResponse;
import threescale.v3.api.ParameterMap;
import threescale.v3.api.ServerError;
import threescale.v3.api.ServiceApi;
import threescale.v3.api.impl.ServiceApiDriver;

import javax.inject.Inject;
import javax.inject.Named;

@Named("authRepProcessor")
public class AuthRepProcessor implements Processor {

  @Inject
  @ConfigProperty(name = "SERVICE_TOKEN")
  private String serviceToken;

  @Inject
  @ConfigProperty(name = "SERVICE_ID")
  private String serviceId;

  @Override
  public void process(Exchange exchange) throws Exception {
    String appId = exchange.getIn().getHeader("appId", String.class);
    String appKey = exchange.getIn().getHeader("appKey", String.class);

    AuthorizeResponse authzResponse = authrep(createParametersMap(appId, appKey));

    if(authzResponse.success() == false) {
      exchange.setProperty(Exchange.ROUTE_STOP, true);
      exchange.getIn().setHeader("authz:errorCode", authzResponse.getErrorCode());
      exchange.getIn().setHeader("authz:reason", authzResponse.getReason());
    }

  }

  private ParameterMap createParametersMap(String appId, String appKey) {
    ParameterMap params = new ParameterMap();
    params.add("app_id", appId);
    params.add("app_key", appKey);

    ParameterMap usage = new ParameterMap();
    usage.add("hits", "1");
    params.add("usage", usage);

    return params;
  }

  private AuthorizeResponse authrep(ParameterMap params) {

    ServiceApi serviceApi = ServiceApiDriver.createApi();

    AuthorizeResponse response = null;

    try {
      response = serviceApi.authrep(serviceToken, serviceId, params);
    } catch (ServerError serverError) {
      serverError.printStackTrace();
      throw new RuntimeCamelException(serverError.getMessage(), serverError.getCause());
    }
    return response;
  }
}

We simply use this processor in our Camel route to add the 3Scale functionality:

package nl.rubix.eos.api.camelthreescale;

import io.fabric8.annotations.Alias;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.ContextName;

import javax.inject.Inject;

@ContextName("activemq-camel-api")
public class ActiveMqCamelApi extends RouteBuilder{

  @Inject
  @Alias("jms")
  private ActiveMQComponent activeMQComponent;

  @Override
  public void configure() throws Exception {
      from("jms:queue:test")
        .log("received message")
        .process("authRepProcessor")
        .log("request authenticated and authorized");
  }
}

When looking at the logs we can see the request is authenticated when we send a request with the correct appId and appKey in the JMS headers. When looking at the logs we can see the request is passing the processor:

2018-03-10 20:28:40,294 [cdi.Main.main()] INFO  DefaultCamelContext            - Route: route1 started and consuming from: Endpoint[jms://queue:test]
2018-03-10 20:28:40,295 [cdi.Main.main()] INFO  DefaultCamelContext            - Total 1 routes, of which 1 are started.
2018-03-10 20:28:40,295 [cdi.Main.main()] INFO  DefaultCamelContext            - Apache Camel 2.17.0.redhat-630187 (CamelContext: activemq-camel-api) started in 0.512 seconds
2018-03-10 20:28:40,318 [cdi.Main.main()] INFO  Bootstrap                      - WELD-ENV-002003: Weld SE container STATIC_INSTANCE initialized
2018-03-10 20:29:37,157 [sConsumer[test]] INFO  route1                         - received message
2018-03-10 20:29:38,307 [sConsumer[test]] INFO  route1                         - request authenticated and authorized

And off course we can see the metrics in 3Scale:

Now this processor discards the message when the authentication by 3Scale fails, but it is off course possible to send the unauthorized messages towards a special error queue, or make the entire route transactional and simply do not send an ACK when authentication fails.

The entire code of this example is available on Github.

 
Leave a comment

Posted by on 2018-03-10 in API Management

 

Tags: , , ,

Configuring a Network of Brokers in Fuse Fabric

In ActiveMQ it is possible to define logical groups of message brokers in order to obtain more resiliency or throughput.
The setup configuration described here can be outlined as follows:
Network_of_brokers_layout
Creating broker groups and a network of brokers can be done in various manners in JBoss Fuse Fabric. Here we are going to use the Fabric CLI.
The following steps are necessary to create the configuration above:

  1. Creating a Fabric (if we don’t already have one)

  2. Create child containers

  3. Create the MQ broker profiles and assign them to the child containers

  4. Connect a couple of clients for testing

1. Creating a Fabric (optional)

Assuming we start with a clean Fuse installation the first step is creating a Fabric. This step can be skipped if a fabric is already available.

In the Fuse CLI execute the following command:

JBossFuse:karaf@root> fabric:create -p fabric --wait-for-provisioning

2. Create child containers

Next we are going to create two sets of child containers which are going to host our brokers. Note, the child containers we are going to create in this step are empty child containers and do not yet contain AMQ brokers. We are going to provision these containers with AMQ brokers in step 3.

First create the child containers for siteA:

JBossFuse:karaf@root> fabric:container-create-child root site-a 2

Next create the child containers for siteB:

JBossFuse:karaf@root> fabric:container-create-child root site-b 2

3. Create the MQ broker profiles and assign them to the child containers

In this step we are going to create the broker profiles in fabric and assign them to the containers we created in the previous step.


JBossFuse:karaf@root> fabric:mq-create --group site-a --networks site-b --networks-username admin --networks-password admin --assign-container site-a1,site-a2 site-a-profile

JBossFuse:karaf@root> fabric:mq-create --group site-b --networks site-a --networks-username admin --networks-password admin --assign-container site-b1,site-b2 site-b-profile

The fabric:mq-create command creates a broker profile in Fuse Fabric. The –group flag assigns a group to the brokers in the profile. The networks flag creates the required network connection needed for a network of brokers. In the assign-container flag we assign this newly created broker profile to one or more containers.

4.Connect a couple of clients for testing

A sample project containing two clients, one producer and one consumer is available on github.

Clone the repository:

$ git clone https://github.com/pimg/mq-fabric-client.git

build the project:

$ mvn clean install

Start the message consumer (in the java-consumer directory):

$ mvn exec:java

Start the message producer (in the java-producer directory):

$ mvn exec:java

Observe the console logging of the producer:

14:48:58 INFO  Using local ZKClient
14:48:58 INFO  Starting
14:48:58 INFO  Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
14:48:58 INFO  Client environment:host.name=pim-XPS-15-9530
14:48:58 INFO  Client environment:java.version=1.8.0_77
14:48:58 INFO  Client environment:java.vendor=Oracle Corporation
14:48:58 INFO  Client environment:java.home=/home/pim/apps/jdk1.8.0_77/jre
14:48:58 INFO  Client environment:java.class.path=/home/pim/apps/apache-maven-3.3.9/boot/plexus-classworlds-2.5.2.jar
14:48:58 INFO  Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
14:48:58 INFO  Client environment:java.io.tmpdir=/tmp
14:48:58 INFO  Client environment:java.compiler=<NA>
14:48:58 INFO  Client environment:os.name=Linux
14:48:58 INFO  Client environment:os.arch=amd64
14:48:58 INFO  Client environment:os.version=4.2.0-34-generic
14:48:58 INFO  Client environment:user.name=pim
14:48:58 INFO  Client environment:user.home=/home/pim
14:48:58 INFO  Client environment:user.dir=/home/pim/workspace/mq-fabric/java-producer
14:48:58 INFO  Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@47e011e3
14:48:58 INFO  Opening socket connection to server localhost/127.0.0.1:2181
14:48:58 INFO  Socket connection established to localhost/127.0.0.1:2181, initiating session
14:48:58 INFO  Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x154620540e80009, negotiated timeout = 40000
14:48:58 INFO  State change: CONNECTED
14:48:59 INFO  Adding new broker connection URL: tcp://10.0.3.1:38417
14:49:00 INFO  Successfully connected to tcp://10.0.3.1:38417
14:49:00 INFO  Sending to destination: queue://fabric.simple this text: 1. message sent
14:49:00 INFO  Sending to destination: queue://fabric.simple this text: 2. message sent
14:49:01 INFO  Sending to destination: queue://fabric.simple this text: 3. message sent
14:49:01 INFO  Sending to destination: queue://fabric.simple this text: 4. message sent
14:49:02 INFO  Sending to destination: queue://fabric.simple this text: 5. message sent
14:49:02 INFO  Sending to destination: queue://fabric.simple this text: 6. message sent
14:49:03 INFO  Sending to destination: queue://fabric.simple this text: 7. message sent
14:49:03 INFO  Sending to destination: queue://fabric.simple this text: 8. message sent
14:49:04 INFO  Sending to destination: queue://fabric.simple this text: 9. message sent

Observe the console logging of the consumer:

14:48:20 INFO  Using local ZKClient
14:48:20 INFO  Starting
14:48:20 INFO  Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
14:48:20 INFO  Client environment:host.name=pim-XPS-15-9530
14:48:20 INFO  Client environment:java.version=1.8.0_77
14:48:20 INFO  Client environment:java.vendor=Oracle Corporation
14:48:20 INFO  Client environment:java.home=/home/pim/apps/jdk1.8.0_77/jre
14:48:20 INFO  Client environment:java.class.path=/home/pim/apps/apache-maven-3.3.9/boot/plexus-classworlds-2.5.2.jar
14:48:20 INFO  Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
14:48:20 INFO  Client environment:java.io.tmpdir=/tmp
14:48:20 INFO  Client environment:java.compiler=<NA>
14:48:20 INFO  Client environment:os.name=Linux
14:48:20 INFO  Client environment:os.arch=amd64
14:48:20 INFO  Client environment:os.version=4.2.0-34-generic
14:48:20 INFO  Client environment:user.name=pim
14:48:20 INFO  Client environment:user.home=/home/pim
14:48:20 INFO  Client environment:user.dir=/home/pim/workspace/mq-fabric/java-consumer
14:48:20 INFO  Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@3d732a14
14:48:20 INFO  Opening socket connection to server localhost/127.0.0.1:2181
14:48:20 INFO  Socket connection established to localhost/127.0.0.1:2181, initiating session
14:48:20 INFO  Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x154620540e80008, negotiated timeout = 40000
14:48:20 INFO  State change: CONNECTED
14:48:21 INFO  Adding new broker connection URL: tcp://10.0.3.1:38417
14:48:21 INFO  Successfully connected to tcp://10.0.3.1:38417
14:48:21 INFO  Start consuming messages from queue://fabric.simple with 120000ms timeout
14:49:00 INFO  Got 1. message: 1. message sent
14:49:00 INFO  Got 2. message: 2. message sent
14:49:01 INFO  Got 3. message: 3. message sent
14:49:01 INFO  Got 4. message: 4. message sent
14:49:02 INFO  Got 5. message: 5. message sent
14:49:02 INFO  Got 6. message: 6. message sent
14:49:03 INFO  Got 7. message: 7. message sent
14:49:03 INFO  Got 8. message: 8. message sent
14:49:04 INFO  Got 9. message: 9. message sent
 
Leave a comment

Posted by on 2016-12-02 in JBoss Fuse

 

Tags: , , , , ,

ActiveMQ DLQ use OriginalDestination in Camel

Ah the DLQ, the place where messages go to die. When messages end up in the DLQ (Dead letter queue) of ActiveMQ they receive an additional message header with the queue name where the message resided originally. This message header called Original Destination, can be viewed from Hawtio.

DLQSubscriber-1

When using Apache Camel as a subscriber I noticed something strange. Normally all JMS Headers and properties are mapped to the Camel Exchange headers and properties. However, the OriginalDestination header was not.

For example when creating this dummy route for testing purposes I got the following output:


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.camel.builder.RouteBuilder;

public class DLQSubscriberRouteBuilder extends RouteBuilder{

    @Override
    public void configure() throws Exception {
        from("amq:queue:ActiveMQ.DLQ").log("${headers}");
    }

}


 


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-13:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438348926673, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:1:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438348926663, JMSType=null, JMSXGroupID=null, JMSXUserID=null, originalExpiration=1438348926673}

I spent quite some time trying to figure out what happened when I realized the JMS message on the DLQ is not the exact same message from the original queue with some extra headers and properties. The original message seems to be wrapped inside the message placed on the DLQ. To access this message we need to work with the “raw” JMSMessage from ActiveMQ rather the processed message from Camel. This can be done by creating a processor which uses a typeconverter in Camel to get the JMS message from the Camel exchange.

In the code below I created a processor which fetches the inner message and from that message retrieves the originalDestination property (which is an instance variable on the ActiveMQMessage class (ActiveMQTextMessage is a subclass of ActiveMQMessage) and places this property as an header on the Camel exchange.


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsMessage;

public class DQLMessageProcessor implements Processor{

    @Override
    public void process(Exchange exchange) throws Exception {
        JmsMessage jmsMsg = exchange.getIn().getBody(JmsMessage.class);
        ActiveMQTextMessage innerMsg = (ActiveMQTextMessage) jmsMsg.getJmsMessage();
        exchange.getIn().setHeader("OriginalDestination", innerMsg.getOriginalDestination());
    }

}


Now when observing the output log of .log(“${headers}”) we see the OriginalDestination header in the log output:


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-21:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438350527570, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:4:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438350527560, JMSType=null, JMSXGroupID=null, JMSXUserID=null, OriginalDestination=queue://test2, originalExpiration=1438350527570}

In the end it costs me quite some time to grab the concept of fetching the ActiveMQTextMessage from the JmsMessage, since I was under the impression the Camel ActiveMQ component does this out of the box.

Anyway I hope it helps someone.

 

 
1 Comment

Posted by on 2015-07-31 in JBoss Fuse

 

Tags: , , , , ,

Fuse Fabric MQ provision exception

When using the discovery endpoint to connect to a master slave ActiveMQ cluster I ran into an error.
It is worth to note that this exception is not caused by the discovery mechanism, I happened to find this issue when using the discovery url, however when hard wiring the connector (e.g. using tcp://localhost:61616) will also cause this issue. This issue has to do with the container setup and profiling which I will explain below.

“Provision Exception:

org.osgi.service.resolver.ResolutionException: Unable to resolve dummy/0.0.0: missing requirement [dummy/0.0.0] osgi.identity; osgi.identity=fabricdemo; type=osgi.bundle; version=”[1.0.0.SNAPSHOT,1.0.0.SNAPSHOT]” [caused by: Unable to resolve fabricdemo/1.0.0.SNAPSHOT: missing requirement [fabricdemo/1.0.0.SNAPSHOT] osgi.wiring.package; filter:=”(&(osgi.wiring.package=org.apache.activemq.camel.component)(version>=5.9.0)(!(version>=6.0.0)))”]”

Fabric-MQ-provision-error-1

This is caused by the Fuse container setup, where the brokers run on two seperate containers in a master slave construction. The container running the deployed Camel route is also running the JBoss Fuse minimal profile.

Fabric-MQ-provision-error-2

The connection to the broker cluster is setup in the Blueprint context in the following manner:

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
	<property name="brokerURL" value="discovery:(fabric:default)" />
	<property name="userName" value="admin" />
	<property name="password" value="admin" />
</bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
	<property name="maxConnections" value="1" />
	<property name="maximumActiveSessionPerConnection" value="500" />
	<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
	<property name="connectionFactory" ref="pooledConnectionFactory" />
	<property name="concurrentConsumers" value="10" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
	<property name="configuration" ref="jmsConfig"/>
</bean>

When starting the container a provision exception is thrown, this exception: missing requirement [fabricdemo/1.0.0.SNAPSHOT] osgi.wiring.package; filter:=”(&(osgi.wiring.package=org.apache.activemq.camel.component)(version>=5.9.0)(!(version>=6.0.0)))”]”

Again, it is not just caused by the discovery brokerURL: .

The exception is thrown because the JBoss Fuse minimal profile does not provides the ActiveMQ component. When adding the dependencies for this component to your project the exception still persists, the reason for this is that the component is not exported by default in the OSGi bundle. This means other bundles cannot use it. Exporting the class implementing the component will solve this issue.
To export the ActiveMQ component class we need to extend the Apache Felix maven-bundle-plugin. We need to tell the plugin to export the ActiveMQ component, this can be done by adding the following line to the configuration:

<Export-Package>org.apache.activemq.camel.component</Export-Package>

The entire plugin now looks like this:

<plugin>
	<groupId>org.apache.felix</groupId>
	<artifactId>maven-bundle-plugin</artifactId>
	<version>2.3.7</version>
	<extensions>true</extensions>
	<configuration>
		<instructions>
			<Bundle-SymbolicName>fabricdemo</Bundle-SymbolicName>
			<Private-Package>nl.rubix.camel-activemq.*</Private-Package>
			<Import-Package>*</Import-Package>
			<Export-Package>org.apache.activemq.camel.component</Export-Package>
		</instructions>
	</configuration>
</plugin>

As a side note: to use the discovey broker url the mq-fabric feature must be added to the profile. For this demo I used the Fabric maven plugin which I configured as follows:

<plugin>
	<groupId>io.fabric8</groupId>
	<artifactId>fabric8-maven-plugin</artifactId>
	<configuration>
		<profile>activemqtest</profile>
		<features>mq-fabric</features>
	</configuration>
</plugin>

After the modification the the pom.xml the container will start properly:

Fabric-MQ-provision-error-3

Anyway I hope this helps someone.

 
1 Comment

Posted by on 2014-12-24 in JBoss Fuse

 

Tags: , , , , , ,