RSS

Tag Archives: Apache Camel

Playing around with Camel AsyncProcessor

One of the most frequently used constructs in Apache Camel is the Processor (http://camel.apache.org/processor.html), it is used ofter for invoking custom code or performing message translations. The API of the processor is very clear and well documented. As are numerous examples available for using a Camel Processor. The lesser known brother of the Processor is the AsyncProcessor (http://camel.apache.org/asynchronous-processing.html) which is less documented and a less frequently used. Mainly because the AsyncProcessor is mainly targeted for Camel Component developers. However, recently I decided to play around with the Camel AsyncProcessor in a regular Camel setup. In this blog I would like to explain one possible way how to use the AsyncProcessor in a Camel route setup.

Creating a AsyncProcessor

Similar to creating a Processor, creating a AsyncProcessor starts by implementing the AsyncProcessor interface.


public class MyAsyncProcessor implements AsyncProcessor {

But instead of one process method, now two must be implemented:


@Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {

@Override public void process(Exchange exchange) {

Next to the regular process method  process method which takes an AsyncCallback must be implemented. The AsyncCallback is invoked whenever the Async execution (which must be started in a seperate thread) is finished. The return boolean indicates whether or not the Camel routing engine must wait or continue routing to other components/processors defined in the Camel route.

Starting the Async job

In order to start a async job the execution must take place in another thread. Java nowadays has multiple ways for concurrent, multi threaded execution. For this example we simply create a Runnable class en start the job via een executor service.


private class AsyncBackgroundProcess implements Runnable {
  private Exchange exchange;
  private AsyncCallback asyncCallback;
  public AsyncBackgroundProcess(Exchange exchange, AsyncCallback asyncCallback){
    this.exchange = exchange;
    this.asyncCallback = asyncCallback;
  }
  @Override public void run() {
    log.info("Async backend process started");
    Boolean getBoolean = slowExecutionInterface.getBoolean("bla");
    exchange.setProperty("Response", getBoolean);
    log.info("Async backend process completed");
    asyncCallback.done(false);
  }
}

There are two things to note here:

  1. The AsyncCallback object from the process method is passed to the Runnable class
  2. When the execution is finished the asyncCallback.done method is called
    1. The false parameter indicates if the execution is handled synchronously (true) or asynchronously (false)

Implementing the AsyncProcessor process method

In the Async Processor process method we kickoff the Runnable class and define the callback method:


@Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {
  log.info("Async process started");
  CountDownLatch countDownLatch = new CountDownLatch(1);
  exchange.setProperty("countDownLatch", countDownLatch);
  executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
    @Override public void done(boolean b) {
      log.info("Async backend process fininshed");
      exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
    }
  }));
  return true;
}

Using the AsyncProcessor in a Camel route

Using a AsyncProcessor in a Camel route is exactly the same as using a regular Processor in the route:


from(jettyEndpoint)
.log("received")
.process(myAsyncProcessor)

Getting a response

By default the AsyncProcessor triggers a new thread for execution and does not sync back to the main execution thread. So getting a response in a “Fork-Join” manner requires some additional work. In the implementation of the process method above some actions for getting a response are already present:

A CountDownLatch is used for defining the number of threads the main thread can wait for (in our case 1):


CountDownLatch countDownLatch = new CountDownLatch(1);

in order to use the CountDownLatch downstream in our Camel route we save the it to an exchange property:


exchange.setProperty("countDownLatch", countDownLatch);

in the trigger for the Runnable class in a new thread we define the AsyncCallback and its actions when the done method is invoked, to count down the CountDownLatch indicating the background thread is finished:


executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
    @Override public void done(boolean b) {
      log.info("Async backend process fininshed");
      exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
    }
  }));

But this does not synchronize our threads just that. In order to get a response in our Camel route or Processor the execution must wait at some point in time for the background thread to complete. For this a helper method was created:


public static void getResponseInBody(Exchange exchange) {
  CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
  exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
  log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
  exchange.getIn().setBody(exchange.getProperty("Response", String.class));
}

In this helper method the CountDownLatch is used from the exchange and the Camel AwaitManager is used for the thread synchronization:

CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);

Since this helper method is static, it can be invoked from anywhere in the route or processor, thereby giving the flexibility where in the route the threads must be synchronized.


@Override
public void configure() throws Exception {
from(jettyEndpoint)
.log("received")
.process(myAsyncProcessor)
.log("exited processor")
.bean(MyAsyncProcessor.class, "getResponse(${exchange})")
.log("${body} and Response Property ${property.Response}");
}

The entire AsyncProcessor looks like this:


package nl.rubix.eos.poc.asyncprocessor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Named("myAsyncProcessor")
public class MyAsyncProcessor implements AsyncProcessor {
  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
  private static Logger log = LoggerFactory.getLogger(MyAsyncProcessor.class);
  @Inject
  @Named("slowExecutionImpl")
  private SlowExecutionInterface slowExecutionInterface;
  @Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {
    log.info("Async process started");
    CountDownLatch countDownLatch = new CountDownLatch(1);
    exchange.setProperty("countDownLatch", countDownLatch);
    executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
      @Override public void done(boolean b) {
        log.info("Async backend process fininshed");
        exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
      }
    }));
    return true;
  }
  @Override public void process(Exchange exchange) throws Exception {
    throw new IllegalStateException("Should never be called");
  }
  private class AsyncBackgroundProcess implements Runnable {
    private Exchange exchange;
    private AsyncCallback asyncCallback;
    public AsyncBackgroundProcess(Exchange exchange, AsyncCallback asyncCallback){
      this.exchange = exchange;
      this.asyncCallback = asyncCallback;
    }
    @Override public void run() {
      log.info("Async backend process started");
      Boolean getBoolean = slowExecutionInterface.getBoolean("bla");
      exchange.setProperty("Response", getBoolean);
      log.info("Async backend process completed");
      asyncCallback.done(false);
    }
  }
  public static void getResponseInBody(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
    exchange.getIn().setBody(exchange.getProperty("Response", String.class));
  }
  public static void getResponseInProperty(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
  }
  public static Boolean getResponse(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
    return exchange.getProperty("Response", Boolean.class);
  }
}

The entire Camel route looks like this:


package nl.rubix.api.poc;
import nl.rubix.eos.poc.asyncprocessor.MyAsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.ContextName;
import org.apache.camel.cdi.Uri;
import javax.inject.Inject;
import javax.inject.Named;
/**
 * Configures all our Camel routes, components, endpoints and beans
 */
@ContextName("myJettyCamel")
public class MyJettyRoute extends RouteBuilder {
    @Inject @Uri("jetty:http://0.0.0.0:8080/async/test")
    private Endpoint jettyEndpoint;
    @Inject
    @Named("myAsyncProcessor")
    MyAsyncProcessor myAsyncProcessor;
    @Override
    public void configure() throws Exception {
        from(jettyEndpoint)
            .log("received")
            .process(myAsyncProcessor)
            .log("exited processor")
            .bean(MyAsyncProcessor.class, "getResponse(${exchange})")
            .log("${body} and Response Property ${property.Response}");
    }
}

To simulate slow execution a simple interface and corresponding implementation where used:


package nl.rubix.eos.poc.asyncprocessor;
public interface SlowExecutionInterface {
  Boolean getBoolean(String input);
}

package nl.rubix.eos.poc.asyncprocessor;
import javax.inject.Named;
@Named("slowExecutionImpl")
public class SlowExecutionInterfaceMock implements SlowExecutionInterface {
  @Override public Boolean getBoolean(String input) {
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return true;
  }
}

 
Leave a comment

Posted by on 2017-05-26 in JBoss Fuse

 

Tags: , , , , ,

Apache Camel – Dynamic redelivery based on MEP

The exception handling and retry mechanisms in Apache Camel are quite extensive. In this blogpost we are going to take a look at customizing the retry based on a predicate implementation of our own thereby enabling really fine grained retry logic.
In our example we are going to look at the MEP of the exchange, whenever it is inOnly we are going to retry since no synchronous subscriber is waiting for the response. Conversely, when the MEP of the exchange is inOut we are not going to retry and immediately send back an error. Think fail fast en stuff.
Like mentioned above we can enable this by implementing the Predicate interface from Apache Camel.
The entire class looks like this:
package nl.schiphol.api.integration.redelivery;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Predicate;
import org.apache.camel.component.properties.PropertiesComponent;

import javax.inject.Inject;
import javax.inject.Named;

@Named
public class InvokeEndpointsRedeliveryPolicy implements Predicate{

 @Inject
 PropertiesComponent properties;

 private Integer MAX_REDELIVERIES;

 /**
 * Returns a Boolean whether or not a redelivery must be executed.
 * Whether or not retries should be executed are dependend on two criteria: the MEP of the exchange and the max retries
 * If the MEP is inOnly a response is not required immediately and retries can commense up until the max retries
 * @implements org.apache.camel.Predicate
 * @param exchange The Camel Exchange is used as a parameter, this is dictated by the Predicate interface
 * @return Boolean if a redelivery should be executed.
 */
 @Override
 public boolean matches(Exchange exchange) {
   setMaxRedeliveries();
   //no redelivery is the default
   Boolean shouldRedeliver = false;

   ExchangePattern exchangePattern = exchange.getPattern();
   Boolean isInOut = exchangePattern.isOutCapable();
   Integer redeliveryCounter = getRedeliveryCounter(exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class));

   if(!isInOut && redeliveryCounter < MAX_REDELIVERIES){
    shouldRedeliver = true;
   }

   return shouldRedeliver;
 }

 private void setMaxRedeliveries() {
   try {
   String retryAmountProperty = properties.parseUri("{{health-api.invokeEndpoints.retryAmount}}");
   MAX_REDELIVERIES = new Integer(retryAmountProperty);
   } catch (Exception e) {
     MAX_REDELIVERIES = 3;
   }
 }

 private Integer getRedeliveryCounter(Integer redeliveryCounterHeader){
   Integer redeliveryCounter = redeliveryCounterHeader;

   if(redeliveryCounter == null){
    redeliveryCounter = 0;
   }

   return redeliveryCounter;
 }

 private Boolean getRetryProperty(Boolean retryProperty) {
   Boolean retry = retryProperty;

   if(retry == null){
     retry = false;
   }

   return retry;
 }
}
In order to use our predicate in our Camel Route for determining the retry use the ‘retryWhile’ statement:
@Override
private void configure() {
 from("direct:start").routeId("my-dynamic-retry-route")
 .onException(Exception.class).redeliveryDelay(1500).retryWhile(redeliveryPolicy).continued(true).end()
 ...
 
Leave a comment

Posted by on 2017-04-06 in JBoss Fuse

 

Tags: , , , , , , ,

Creating an insecure http4 component in Apache Camel

Recently I was struggling with invoking HTTP endpoints using self-signed certificates using the Apache Camel http4 component. The crux of the problem was the fact these certificates change rapidly and are maintained by other teams. Since this was an internal call only, routed through a VPN I decided  to approach the problem by disabling the certificate check instead of adding the self-signed certificates to a keystore which I normally do in these situations.  So keep in mind, doing this will undermine the security of the TLS connection. And obviously only works with one-way TLS connections.
It was with disabling the certificate checks in the http4 component that I was struggling so I decided to share what I did with you when I eventually found out the solution.
A small note, the example provided in this blogpost is using Apache Camel in combination of CDI. But the solution will work equally well with frameworks like Spring or OSGi Blueprint.
First I created my own instance of the HttpComponent seperating it from the standard http4 component.
public class CustomHttp4Component {

    @Produces
    @Named("insecurehttps4")
    public HttpComponent createCustomHttp4Component() {
       HttpComponent httpComponent = new HttpComponent();

In this custom http4 component we have to do two things. First is to change the Certificate HostnameVerifier so the hostname on the self-signed certificate is not causing an exception.

This is accomplished by setting an instance of the org.apache.http.conn.ssl.AllowAllHostnameVerifier to the http4 component using the setter method setX509HostNameVerifier. Note, the version of Apache Camel I am using (2.17) still requires the now deprecated: org.apache.http.conn.ssl.AllowAllHostnameVerifier newer versions of Apache Camel are using the org.apache.http.conn.ssl.NoopHostnameVerifier.

This unfortunately was not enough to invoke the endpoint, an empty X509TrustManager was also required. It needs to be empty for our purpose to basically omit the certificate validation checks. For this we needed to extend the X509ExtendedTrustManager and override the methods implementing them as “empty”. To set our empty thrustmanager on the http4 component we need to wrap it in a TrustManagersParameters class and wrap this into an SSLParameters class before we can add it to the http4 component.



TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
X509ExtendedTrustManager extendedTrustManager = new InsecureX509TrustManager();
trustManagersParameters.setTrustManager(extendedTrustManager);

SSLContextParameters sslContextParameters = new SSLContextParameters();
sslContextParameters.setTrustManagers(trustManagersParameters);
httpComponent.setSslContextParameters(sslContextParameters);

After this we can use our new insecure http4 component in our Camel routes just as the normal http4 component.
The entire custom http4 component:

package nl.schiphol.api.integration.components;

import org.apache.camel.component.http4.HttpComponent;
import org.apache.camel.util.jsse.SSLContextParameters;
import org.apache.camel.util.jsse.TrustManagersParameters;
import org.apache.http.conn.ssl.AllowAllHostnameVerifier;

import javax.enterprise.inject.Produces;
import javax.inject.Named;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedTrustManager;
import java.net.Socket;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

public class CustomHttp4Component {

    @Produces
    @Named("insecurehttps4")
    public HttpComponent createCustomHttp4Component() {
        HttpComponent httpComponent = new HttpComponent();

        httpComponent.setX509HostnameVerifier(AllowAllHostnameVerifier.INSTANCE);


        TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
        X509ExtendedTrustManager extendedTrustManager = new InsecureX509TrustManager();
        trustManagersParameters.setTrustManager(extendedTrustManager);

        SSLContextParameters sslContextParameters = new SSLContextParameters();
        sslContextParameters.setTrustManagers(trustManagersParameters);
        httpComponent.setSslContextParameters(sslContextParameters);

        return httpComponent;
    }

    private static class InsecureX509TrustManager extends X509ExtendedTrustManager {
        @Override
        public void checkClientTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] x509Certificates, String s, Socket socket) throws CertificateException {

        }

        @Override
        public void checkClientTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] x509Certificates, String s, SSLEngine sslEngine) throws CertificateException {

        }

        @Override
        public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

        }

        @Override
        public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {

        }

        @Override
        public X509Certificate[] getAcceptedIssuers() {
            return new X509Certificate[0];
        }
    }
}

 
Leave a comment

Posted by on 2017-02-17 in Geen categorie

 

Tags: , , ,

JBoss Fuse Jolokia requests

Even though Hawtio is a pretty awesome console for JBoss Fuse, sometimes you want or even need to have the underlying data Hawtio uses. Maybe you want to incorporate it in your existing dashboard, maybe you want to store it in a database so you have a historical record, or maybe you’re just curious and want to experiment with it.

The nice thing of JBoss Fuse and really the underlying community products, mainly Apache Camel and Apache ActiveMQ, is that they expose a lot of metrics and statistics via JMX. Now you could start your favorite IDE or text editor and write a slab of Java code to query the JMX metrics from Apache Camel and Apache ActiveMQ, like some sort of maniac. Or you could leverage the out of the box Jolokia features, like the people from Hawtio did, and really any sane person would do.

For those of you who never heard of Jolokia, in a nutshell its basically JMX over http/json. Which makes doing JMX almost fun again.

Now to start things off here are a couple Jolokia requests (and responses) to start you off. These are just plain http requests, do note they use basic authentication to authenticate against the JBoss Fuse server.

To start things of some statistics on the JVM:

The request for querying the heapsize of the JVM JBoss Fuse is running on:

http://localhost:8181/hawtio/jolokia/read/java.lang:type=Memory/HeapMemoryUsage

Which returns, on my local test machine, the following response:

{
   "request":{
      "mbean":"java.lang:type=Memory",
      "attribute":"HeapMemoryUsage",
      "type":"read"
   },
   "value":{
      "init":536870912,
      "committed":749207552,
      "max":954728448,
      "used":118298056
   },
   "timestamp":1469035168,
   "status":200
}

Now we know how to query the Heap memory, fetching the NonHeap is pretty trivial:

http://localhost:8181/hawtio/jolokia/read/java.lang:type=Memory/NonHeapMemoryUsage

{
   "request":{
      "mbean":"java.lang:type=Memory",
      "attribute":"NonHeapMemoryUsage",
      "type":"read"
   },
   "value":{
      "init":2555904,
      "committed":120848384,
      "max":-1,
      "used":110152704
   },
   "timestamp":1469035298,
   "status":200
}

Some additional threading statistics for the JVM:

http://localhost:8181/hawtio/jolokia/read/java.lang:type=Threading

which returns:

{
   "request":{
      "mbean":"java.lang:type=Threading",
      "type":"read"
   },
   "value":{
      "ThreadAllocatedMemorySupported":true,
      "ThreadContentionMonitoringEnabled":false,
      "TotalStartedThreadCount":125,
      "CurrentThreadCpuTimeSupported":true,
      "CurrentThreadUserTime":610000000,
      "PeakThreadCount":108,
      "AllThreadIds":[
         150,
         45,
         123,
         122,
         121,
         120,
         119,
         118,
         117,
         116,
         115,
         113,
         111,
         110,
         109,
         107,
         106,
         105,
         104,
         103,
         102,
         101,
         100,
         99,
         98,
         97,
         96,
         95,
         94,
         93,
         92,
         90,
         87,
         86,
         84,
         83,
         82,
         81,
         80,
         79,
         78,
         77,
         76,
         74,
         73,
         72,
         71,
         70,
         69,
         68,
         67,
         65,
         62,
         61,
         60,
         58,
         57,
         55,
         54,
         53,
         51,
         47,
         46,
         44,
         42,
         40,
         39,
         38,
         37,
         36,
         35,
         34,
         33,
         32,
         31,
         30,
         28,
         27,
         26,
         24,
         25,
         21,
         20,
         23,
         22,
         19,
         18,
         15,
         14,
         13,
         12,
         11,
         4,
         3,
         2,
         1
      ],
      "ThreadAllocatedMemoryEnabled":true,
      "CurrentThreadCpuTime":624340113,
      "ObjectName":{
         "objectName":"java.lang:type=Threading"
      },
      "ThreadContentionMonitoringSupported":true,
      "ThreadCpuTimeSupported":true,
      "ThreadCount":96,
      "ThreadCpuTimeEnabled":true,
      "ObjectMonitorUsageSupported":true,
      "SynchronizerUsageSupported":true,
      "DaemonThreadCount":70
   },
   "timestamp":1469035389,
   "status":200
}

Now for some JBoss Fuse specific queries:

First some overall statistics for our ActiveMQ broker:

http://localhost:8181/hawtio/jolokia/read/org.apache.activemq:type=Broker,brokerName=amq

Which returns:

{
   "request":{
      "mbean":"org.apache.activemq:brokerName=amq,type=Broker",
      "type":"read"
   },
   "value":{
      "StatisticsEnabled":true,
      "TotalConnectionsCount":0,
      "StompSslURL":"",
      "TransportConnectors":{
         "openwire":"tcp:\/\/localhost:61616",
         "stomp":"stomp+nio:\/\/pim-XPS-15-9530:61613"
      },
      "StompURL":"",
      "TotalProducerCount":0,
      "CurrentConnectionsCount":0,
      "TopicProducers":[

      ],
      "JMSJobScheduler":null,
      "UptimeMillis":461929,
      "TemporaryQueueProducers":[

      ],
      "TotalDequeueCount":0,
      "JobSchedulerStorePercentUsage":0,
      "DurableTopicSubscribers":[

      ],
      "QueueSubscribers":[

      ],
      "AverageMessageSize":1024,
      "BrokerVersion":"5.11.0.redhat-621084",
      "TemporaryQueues":[

      ],
      "BrokerName":"amq",
      "MinMessageSize":1024,
      "DynamicDestinationProducers":[

      ],
      "OpenWireURL":"tcp:\/\/localhost:61616",
      "TemporaryTopics":[

      ],
      "JobSchedulerStoreLimit":0,
      "TotalConsumerCount":0,
      "MaxMessageSize":1024,
      "TotalMessageCount":0,
      "TempPercentUsage":0,
      "TemporaryQueueSubscribers":[

      ],
      "MemoryPercentUsage":0,
      "SslURL":"",
      "InactiveDurableTopicSubscribers":[

      ],
      "StoreLimit":10737418240,
      "QueueProducers":[

      ],
      "VMURL":"vm:\/\/amq",
      "TemporaryTopicProducers":[

      ],
      "Topics":[
         {
            "objectName":"org.apache.activemq:brokerName=amq,destinationName=ActiveMQ.Advisory.MasterBroker,destinationType=Topic,type=Broker"
         }
      ],
      "Uptime":"7 minutes",
      "BrokerId":"ID:pim-XPS-15-9530-35535-1469035012806-0:1",
      "DataDirectory":"\/home\/pim\/apps\/jboss-fuse-6.2.1.redhat-084\/data\/amq",
      "Persistent":true,
      "TopicSubscribers":[

      ],
      "MemoryLimit":668309914,
      "Slave":false,
      "TotalEnqueueCount":1,
      "TempLimit":5368709120,
      "TemporaryTopicSubscribers":[

      ],
      "StorePercentUsage":0,
      "Queues":[

      ]
   },
   "timestamp":1469035474,
   "status":200
}

We can also zoom into a specific JMS destination on our broker, in this case the Queue named testQueue:

http://localhost:8181/hawtio/jolokia/read/org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName=testQueue

{
   "request":{
      "mbean":"org.apache.activemq:brokerName=amq,destinationName=testQueue,destinationType=Queue,type=Broker",
      "type":"read"
   },
   "value":{
      "ProducerFlowControl":true,
      "AlwaysRetroactive":false,
      "Options":"",
      "MemoryUsageByteCount":0,
      "AverageBlockedTime":0.0,
      "MemoryPercentUsage":0,
      "CursorMemoryUsage":0,
      "InFlightCount":0,
      "Subscriptions":[

      ],
      "CacheEnabled":true,
      "ForwardCount":0,
      "DLQ":false,
      "AverageEnqueueTime":0.0,
      "Name":"testQueue",
      "MaxAuditDepth":10000,
      "BlockedSends":0,
      "TotalBlockedTime":0,
      "MaxPageSize":200,
      "QueueSize":0,
      "PrioritizedMessages":false,
      "MemoryUsagePortion":0.0,
      "Paused":false,
      "EnqueueCount":0,
      "MessageGroups":{

      },
      "ConsumerCount":0,
      "AverageMessageSize":0,
      "CursorFull":false,
      "MaxProducersToAudit":64,
      "ExpiredCount":0,
      "CursorPercentUsage":0,
      "MinEnqueueTime":0,
      "MinMessageSize":0,
      "MemoryLimit":1048576,
      "DispatchCount":0,
      "MaxEnqueueTime":0,
      "BlockedProducerWarningInterval":30000,
      "DequeueCount":0,
      "ProducerCount":0,
      "MessageGroupType":"cached",
      "MaxMessageSize":0,
      "UseCache":true,
      "SlowConsumerStrategy":null
   },
   "timestamp":1469035582,
   "status":200
}

And next to ActiveMQ we can also retrieve some Camel statistics:

http://localhost:8181/hawtio/jolokia/read/org.apache.camel:context=*,type=routes,name=*/Load01,InflightExchanges,LastProcessingTime,ExchangesFailed

Which returns the metrics: “Load01”, “InflightExchanges”, “LastProcessingTime” and “ExchangesFailed” from all Camel routes in all Camel Contexts:

{
   "request":{
      "mbean":"org.apache.camel:context=*,name=*,type=routes",
      "attribute":[
         "Load01",
         "InflightExchanges",
         "LastProcessingTime",
         "ExchangesFailed"
      ],
      "type":"read"
   },
   "value":{
      "org.apache.camel:context=camel-blueprint,name=\"route1\",type=routes":{
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "InflightExchanges":0,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route3\",type=routes":{
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "InflightExchanges":0,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route2\",type=routes":{
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "InflightExchanges":0,
         "Load01":""
      }
   },
   "timestamp":1469035650,
   "status":200
}

We can off course also remove some filters to get all available statisitics on the Camel routes:

http://localhost:8181/hawtio/jolokia/read/org.apache.camel:context=*,type=routes,name=*

which returns a lot more statistics:

{
   "request":{
      "mbean":"org.apache.camel:context=*,name=*,type=routes",
      "type":"read"
   },
   "value":{
      "org.apache.camel:context=camel-blueprint,name=\"route1\",type=routes":{
         "StatisticsEnabled":true,
         "EndpointUri":"rest:\/\/get:\/user:\/%7Bid%7D?description=Find+user+by+id&outType=nl.schiphol.my.camel.swagger.example.User&routeId=route1",
         "CamelManagementName":"camel-blueprint",
         "ExchangesCompleted":0,
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "Description":null,
         "FirstExchangeCompletedExchangeId":null,
         "FirstExchangeCompletedTimestamp":null,
         "LastExchangeFailureTimestamp":null,
         "MaxProcessingTime":0,
         "LastExchangeCompletedTimestamp":null,
         "Load15":"",
         "DeltaProcessingTime":0,
         "OldestInflightDuration":null,
         "ExternalRedeliveries":0,
         "ExchangesTotal":0,
         "ResetTimestamp":"2016-07-20T19:16:53+02:00",
         "ExchangesInflight":0,
         "MeanProcessingTime":0,
         "LastExchangeFailureExchangeId":null,
         "FirstExchangeFailureExchangeId":null,
         "CamelId":"myCamel",
         "TotalProcessingTime":0,
         "FirstExchangeFailureTimestamp":null,
         "RouteId":"route1",
         "RoutePolicyList":"",
         "FailuresHandled":0,
         "MessageHistory":true,
         "Load05":"",
         "OldestInflightExchangeId":null,
         "State":"Started",
         "InflightExchanges":0,
         "Redeliveries":0,
         "MinProcessingTime":0,
         "LastExchangeCompletedExchangeId":null,
         "Tracing":false,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route3\",type=routes":{
         "StatisticsEnabled":true,
         "EndpointUri":"rest:\/\/get:\/user:\/findAll?description=Find+all+users&outType=nl.schiphol.my.camel.swagger.example.User%5B%5D&routeId=route3",
         "CamelManagementName":"camel-blueprint",
         "ExchangesCompleted":0,
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "Description":null,
         "FirstExchangeCompletedExchangeId":null,
         "FirstExchangeCompletedTimestamp":null,
         "LastExchangeFailureTimestamp":null,
         "MaxProcessingTime":0,
         "LastExchangeCompletedTimestamp":null,
         "Load15":"",
         "DeltaProcessingTime":0,
         "OldestInflightDuration":null,
         "ExternalRedeliveries":0,
         "ExchangesTotal":0,
         "ResetTimestamp":"2016-07-20T19:16:53+02:00",
         "ExchangesInflight":0,
         "MeanProcessingTime":0,
         "LastExchangeFailureExchangeId":null,
         "FirstExchangeFailureExchangeId":null,
         "CamelId":"myCamel",
         "TotalProcessingTime":0,
         "FirstExchangeFailureTimestamp":null,
         "RouteId":"route3",
         "RoutePolicyList":"",
         "FailuresHandled":0,
         "MessageHistory":true,
         "Load05":"",
         "OldestInflightExchangeId":null,
         "State":"Started",
         "InflightExchanges":0,
         "Redeliveries":0,
         "MinProcessingTime":0,
         "LastExchangeCompletedExchangeId":null,
         "Tracing":false,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route2\",type=routes":{
         "StatisticsEnabled":true,
         "EndpointUri":"rest:\/\/put:\/user?description=Updates+or+create+a+user&inType=nl.schiphol.my.camel.swagger.example.User&routeId=route2",
         "CamelManagementName":"camel-blueprint",
         "ExchangesCompleted":0,
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "Description":null,
         "FirstExchangeCompletedExchangeId":null,
         "FirstExchangeCompletedTimestamp":null,
         "LastExchangeFailureTimestamp":null,
         "MaxProcessingTime":0,
         "LastExchangeCompletedTimestamp":null,
         "Load15":"",
         "DeltaProcessingTime":0,
         "OldestInflightDuration":null,
         "ExternalRedeliveries":0,
         "ExchangesTotal":0,
         "ResetTimestamp":"2016-07-20T19:16:53+02:00",
         "ExchangesInflight":0,
         "MeanProcessingTime":0,
         "LastExchangeFailureExchangeId":null,
         "FirstExchangeFailureExchangeId":null,
         "CamelId":"myCamel",
         "TotalProcessingTime":0,
         "FirstExchangeFailureTimestamp":null,
         "RouteId":"route2",
         "RoutePolicyList":"",
         "FailuresHandled":0,
         "MessageHistory":true,
         "Load05":"",
         "OldestInflightExchangeId":null,
         "State":"Started",
         "InflightExchanges":0,
         "Redeliveries":0,
         "MinProcessingTime":0,
         "LastExchangeCompletedExchangeId":null,
         "Tracing":false,
         "Load01":""
      }
   },
   "timestamp":1469035760,
   "status":200
}

Now there are probably tens or hundreds more queries to think of, but hopefully this will start you off in the right direction.

 
Leave a comment

Posted by on 2016-07-20 in JBoss Fuse

 

Tags: , , , , , ,

MVN camel:run exception in Fuse 6.2.1

When playing around with a new Fuse 6.2.1 environment I notices the maven archetypes provided with Fuse did not run out of the box with the mvn camel:run command.

Whenever I executed the command I received the following error:


[ERROR] Failed to execute goal org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084:run (default-cli) on project postback-core: Execution default-cli of goal org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084:run failed: Plugin org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.camel:camel-maven-plugin:jar:2.15.1.redhat-621084 -> org.apache.maven.reporting:maven-reporting-impl:jar:2.0.5 -> org.apache.maven.doxia:doxia-site-renderer:jar:1.0 -> org.apache.velocity:velocity:jar:1.5 -> commons-collections:commons-collections:jar:3.2.1.redhat-7: Failed to read artifact descriptor for commons-collections:commons-collections:jar:3.2.1.redhat-7: Failure to find org.apache.commons:commons-parent:pom:22-redhat-2 in https://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced -> [Help 1]

Executing the command with the –e flag gave the following stacktrace:


org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084:run (default-cli) on project postback-core: Execution default-cli of goal org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084:run failed: Plugin org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.camel:camel-maven-plugin:jar:2.15.1.redhat-621084 -> org.apache.maven.reporting:maven-reporting-impl:jar:2.0.5 -> org.apache.maven.doxia:doxia-site-renderer:jar:1.0 -> org.apache.velocity:velocity:jar:1.5 -> commons-collections:commons-collections:jar:3.2.1.redhat-7

at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212)

at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)

at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)

at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)

at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)

at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)

at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)

at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)

at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)

at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)

at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863)

at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288)

at org.apache.maven.cli.MavenCli.main(MavenCli.java:199)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)

at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)

at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)

at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)

Caused by: org.apache.maven.plugin.PluginExecutionException: Execution default-cli of goal org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084:run failed: Plugin org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.camel:camel-maven-plugin:jar:2.15.1.redhat-621084 -> org.apache.maven.reporting:maven-reporting-impl:jar:2.0.5 -> org.apache.maven.doxia:doxia-site-renderer:jar:1.0 -> org.apache.velocity:velocity:jar:1.5 -> commons-collections:commons-collections:jar:3.2.1.redhat-7

at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:106)

at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207)

... 20 more

Caused by: org.apache.maven.plugin.PluginResolutionException: Plugin org.apache.camel:camel-maven-plugin:2.15.1.redhat-621084 or one of its dependencies could not be resolved: Failed to collect dependencies at org.apache.camel:camel-maven-plugin:jar:2.15.1.redhat-621084 -> org.apache.maven.reporting:maven-reporting-impl:jar:2.0.5 -> org.apache.maven.doxia:doxia-site-renderer:jar:1.0 -> org.apache.velocity:velocity:jar:1.5 -> commons-collections:commons-collections:jar:3.2.1.redhat-7

at org.apache.maven.plugin.internal.DefaultPluginDependenciesResolver.resolveInternal(DefaultPluginDependenciesResolver.java:214)

at org.apache.maven.plugin.internal.DefaultPluginDependenciesResolver.resolve(DefaultPluginDependenciesResolver.java:149)

at org.apache.maven.plugin.internal.DefaultMavenPluginManager.createPluginRealm(DefaultMavenPluginManager.java:400)

at org.apache.maven.plugin.internal.DefaultMavenPluginManager.setupPluginRealm(DefaultMavenPluginManager.java:372)

at org.apache.maven.plugin.DefaultBuildPluginManager.getPluginRealm(DefaultBuildPluginManager.java:231)

at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:102)

... 21 more

Caused by: org.eclipse.aether.collection.DependencyCollectionException: Failed to collect dependencies at org.apache.camel:camel-maven-plugin:jar:2.15.1.redhat-621084 -> org.apache.maven.reporting:maven-reporting-impl:jar:2.0.5 -> org.apache.maven.doxia:doxia-site-renderer:jar:1.0 -> org.apache.velocity:velocity:jar:1.5 -> commons-collections:commons-collections:jar:3.2.1.redhat-7

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.collectDependencies(DefaultDependencyCollector.java:291)

at org.eclipse.aether.internal.impl.DefaultRepositorySystem.collectDependencies(DefaultRepositorySystem.java:316)

at org.apache.maven.plugin.internal.DefaultPluginDependenciesResolver.resolveInternal(DefaultPluginDependenciesResolver.java:202)

... 26 more

Caused by: org.eclipse.aether.resolution.ArtifactDescriptorException: Failed to read artifact descriptor for commons-collections:commons-collections:jar:3.2.1.redhat-7

at org.apache.maven.repository.internal.DefaultArtifactDescriptorReader.loadPom(DefaultArtifactDescriptorReader.java:329)

at org.apache.maven.repository.internal.DefaultArtifactDescriptorReader.readArtifactDescriptor(DefaultArtifactDescriptorReader.java:198)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.resolveCachedArtifactDescriptor(DefaultDependencyCollector.java:535)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.getArtifactDescriptorResult(DefaultDependencyCollector.java:519)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:409)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:363)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.process(DefaultDependencyCollector.java:351)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.doRecurse(DefaultDependencyCollector.java:504)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:458)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:363)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.process(DefaultDependencyCollector.java:351)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.doRecurse(DefaultDependencyCollector.java:504)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:458)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:363)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.process(DefaultDependencyCollector.java:351)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.doRecurse(DefaultDependencyCollector.java:504)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:458)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.processDependency(DefaultDependencyCollector.java:363)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.process(DefaultDependencyCollector.java:351)

at org.eclipse.aether.internal.impl.DefaultDependencyCollector.collectDependencies(DefaultDependencyCollector.java:254)

... 28 more

Caused by: org.apache.maven.model.resolution.UnresolvableModelException: Failure to find org.apache.commons:commons-parent:pom:22-redhat-2 in https://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced

at org.apache.maven.repository.internal.DefaultModelResolver.resolveModel(DefaultModelResolver.java:177)

at org.apache.maven.repository.internal.DefaultModelResolver.resolveModel(DefaultModelResolver.java:226)

at org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:1000)

at org.apache.maven.model.building.DefaultModelBuilder.readParent(DefaultModelBuilder.java:800)

at org.apache.maven.model.building.DefaultModelBuilder.build(DefaultModelBuilder.java:329)

at org.apache.maven.repository.internal.DefaultArtifactDescriptorReader.loadPom(DefaultArtifactDescriptorReader.java:320)

... 47 more

Caused by: org.eclipse.aether.resolution.ArtifactResolutionException: Failure to find org.apache.commons:commons-parent:pom:22-redhat-2 in https://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced

at org.eclipse.aether.internal.impl.DefaultArtifactResolver.resolve(DefaultArtifactResolver.java:444)

at org.eclipse.aether.internal.impl.DefaultArtifactResolver.resolveArtifacts(DefaultArtifactResolver.java:246)

at org.eclipse.aether.internal.impl.DefaultArtifactResolver.resolveArtifact(DefaultArtifactResolver.java:223)

at org.apache.maven.repository.internal.DefaultModelResolver.resolveModel(DefaultModelResolver.java:173)

... 52 more

Caused by: org.eclipse.aether.transfer.ArtifactNotFoundException: Failure to find org.apache.commons:commons-parent:pom:22-redhat-2 in https://repo1.maven.org/maven2 was cached in the local repository, resolution will not be reattempted until the update interval of central has elapsed or updates are forced

at org.eclipse.aether.internal.impl.DefaultUpdateCheckManager.newException(DefaultUpdateCheckManager.java:231)

at org.eclipse.aether.internal.impl.DefaultUpdateCheckManager.checkArtifact(DefaultUpdateCheckManager.java:206)

at org.eclipse.aether.internal.impl.DefaultArtifactResolver.gatherDownloads(DefaultArtifactResolver.java:585)

at org.eclipse.aether.internal.impl.DefaultArtifactResolver.performDownloads(DefaultArtifactResolver.java:503)

at org.eclipse.aether.internal.impl.DefaultArtifactResolver.resolve(DefaultArtifactResolver.java:421)

... 55 more

It took me some time to get this resolved in the end the solution was configuring another maven plugin repository:


<pluginRepository>
  <id>redhat</id>
  <url>https://maven.repository.redhat.com/ga/</url>
  <releases>
    <enabled>true</enabled>
  </releases>
</pluginRepository>

 
1 Comment

Posted by on 2016-01-31 in JBoss Fuse

 

Tags: , ,

ActiveMQ DLQ use OriginalDestination in Camel

Ah the DLQ, the place where messages go to die. When messages end up in the DLQ (Dead letter queue) of ActiveMQ they receive an additional message header with the queue name where the message resided originally. This message header called Original Destination, can be viewed from Hawtio.

DLQSubscriber-1

When using Apache Camel as a subscriber I noticed something strange. Normally all JMS Headers and properties are mapped to the Camel Exchange headers and properties. However, the OriginalDestination header was not.

For example when creating this dummy route for testing purposes I got the following output:


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.camel.builder.RouteBuilder;

public class DLQSubscriberRouteBuilder extends RouteBuilder{

    @Override
    public void configure() throws Exception {
        from("amq:queue:ActiveMQ.DLQ").log("${headers}");
    }

}


 


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-13:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438348926673, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:1:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438348926663, JMSType=null, JMSXGroupID=null, JMSXUserID=null, originalExpiration=1438348926673}

I spent quite some time trying to figure out what happened when I realized the JMS message on the DLQ is not the exact same message from the original queue with some extra headers and properties. The original message seems to be wrapped inside the message placed on the DLQ. To access this message we need to work with the “raw” JMSMessage from ActiveMQ rather the processed message from Camel. This can be done by creating a processor which uses a typeconverter in Camel to get the JMS message from the Camel exchange.

In the code below I created a processor which fetches the inner message and from that message retrieves the originalDestination property (which is an instance variable on the ActiveMQMessage class (ActiveMQTextMessage is a subclass of ActiveMQMessage) and places this property as an header on the Camel exchange.


package nl.rubix.dlqsubscriber.dlqsubscriber;

import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsMessage;

public class DQLMessageProcessor implements Processor{

    @Override
    public void process(Exchange exchange) throws Exception {
        JmsMessage jmsMsg = exchange.getIn().getBody(JmsMessage.class);
        ActiveMQTextMessage innerMsg = (ActiveMQTextMessage) jmsMsg.getJmsMessage();
        exchange.getIn().setHeader("OriginalDestination", innerMsg.getOriginalDestination());
    }

}


Now when observing the output log of .log(“${headers}”) we see the OriginalDestination header in the log output:


[#0 - JmsConsumer[ActiveMQ.DLQ]] route1                         INFO {breadcrumbId=ID:localhost.localdomain-39943-1438348606321-21:1:1:1:1, CamelJmsDeliveryMode=2, dlqDeliveryFailureCause=java.lang.Throwable: Message Expired. Expiration:1438350527570, JMSCorrelationID=null, JMSDeliveryMode=2, JMSDestination=queue://ActiveMQ.DLQ, JMSExpiration=0, JMSMessageID=ID:localhost.localdomain-39943-1438348606321-15:4:1:1:1, JMSPriority=4, JMSRedelivered=false, JMSReplyTo=null, JMSTimestamp=1438350527560, JMSType=null, JMSXGroupID=null, JMSXUserID=null, OriginalDestination=queue://test2, originalExpiration=1438350527570}

In the end it costs me quite some time to grab the concept of fetching the ActiveMQTextMessage from the JmsMessage, since I was under the impression the Camel ActiveMQ component does this out of the box.

Anyway I hope it helps someone.

 

 
1 Comment

Posted by on 2015-07-31 in JBoss Fuse

 

Tags: , , , , ,

Implementing a CXFRS client in JBoss Fuse

Apache CXF is a part of JBoss Fuse, so is Apache Camel. In this blog post we are going to implement a rest client in CXFRS and Camel. Note, you can also implement a rest client using JAX-RS directly, but in this blog post we are using the CXFRS framework.

For this example we are going to implement a client for the public rest api: http://freegeoip.net

From the freegeoip website:

“The HTTP API takes GET requests in the following schema:
freegeoip.net/{format}/{IP_or_hostname}
Supported formats are: csv, xml, json and jsonp. If no IP or hostname is provided, then your own IP is looked up. ”

To create the CXFRS client endpoint and the Camel route using it we will have to complete the following steps.

  1. Add Maven dependencies to the Pom file
  2. Implement the CXFRS client proxy
  3. setup a CxfRsClient Endpoint in Camel
  4. Create a custom processor for formatting the CXFRS request
  5. Create a Camel route to call the CXFRS endpoint

Add Maven dependencies

Assuming you start out with a Camel or Fuse project the following dependencies should be added to the POM file:

<dependency>
      <groupId>org.ow2.asm</groupId>
      <artifactId>asm-all</artifactId>
      <version>4.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.cxf</groupId>
      <artifactId>cxf-rt-transports-http-jetty</artifactId>
      <version>${camel-version}</version>
    </dependency>
<dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-cxf</artifactId>
      <version>${camel-version}</version>
    </dependency>

Implement the CXFRS client proxy

The Camel CXFRS components can either use the CXFRS client proxy API or the HTTP API. When using the entire http request needs to be created in Camel. This option is basically the same as the http component in Camel. Therefore in this post we are going to implement the CXFRS client proxy API.

More information on the CXFRS client proxy API can be found on the CXF website: https://cwiki.apache.org/confluence/display/CXF20DOC/JAX-RS+Client+API#JAX-RSClientAPI-Proxy-basedAPI

To implement the client proxy create a new Java interface (a class will work as well) but since the client proxy is only using empty methods an interface will be fine.

The name of the class/interface can be whatever you want, but since we are making a client for freegeoip I called the interface freegeoip.

@Path(value="/")
public interface Freegeoip {
…

Next we need to annotate the interface with the Path annotation, the value of the path annotation will correspond to the path after the root url. The value “/” corresponds with the root, so with: http://freegeoip.net

Next we need to add a method for specific request we want to implement. Note, you can add as many methods as you want, and if the rest API you want to call actually supports them of cource 😉
In our example we need to implement the API call of freegeoip: freegeoip.net/{format}/{IP_or_hostname}
To do this add a method to our interface, again the name of the method can be whatever you like, here I chose “getGeoip”:

@GET
@Path(value="/{type}/{ip}")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public String getGeoip(@PathParam("type") String type, @PathParam("ip") String ip);

The method is annotaded with the rest http operation it uses, in our example GET. Again with a Path corresponding to the specific path of that particular request. Note that we are using variables in the path value: type and ip. The last annotation we use is the response we expect back from the rest API. In this example both XML and JSON.

Next is the method signature, note the parameters of the method are also annotated with the PathParam annotation. The values of the PathParam annotations corresponds with the values of the variables in the Path annotation. This means the arguments of the methods will be bound to the variables of the path when making the request.

The entire java interface looks like this:

package nl.rubix.cxfrs.test.endpoint;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

@Path(value="/")
public interface Freegeoip {

	@GET
	@Path(value="/{type}/{ip}")
	@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
	public String getGeoip(@PathParam("type") String type, @PathParam("ip") String ip);

}

setup a CxfRsClient Endpoint in Camel

To setup the endpoint create a new Camel context xml file. You can use either Spring of Blueprint, although the configuration differs slightly. In this example I am using Blueprint.

The first thing we need to do is to add the CXF namespace to the Blueprint xml:

xmlns:cxf=”http://camel.apache.org/schema/blueprint/cxf

This enables the CXF options in the Blueprint xml. So now a CxfRsClient can be created. Add this client to the Blueprint XML (outside the CamelContext):

<cxf:rsClient id="rsClient" address="http://freegeoip.net" serviceClass="nl.rubix.cxfrs.test.endpoint.Freegeoip" loggingFeatureEnabled="true"/>

The request to the CXF Client API has some specific requirements. To implement these requirements we use a custom processor. To create a custom processor create a new Java class implementing the Processor interface. Inside the processor we do three things: set the exchange pattern to inOut; setting some CXF headers; create the request object expected by CXF.
The argument of the process method is the exchange object (this method is required when implementing the Processor interface). So manipulating the exchange pattern and headers is quite straightforward. To set the exchange pattern to inOut:

exchange.setPattern(ExchangePattern.InOut);

Next we need tot set the message header to state the operation name we want to call. This operation name corresponds with the method name of our Client Proxy API. We also want to use the Client proxy API in stead of the HTTP API (see above).
To set these headers add the following statements to the processor:

// set the operation name
inMessage.setHeader(CxfConstants.OPERATION_NAME, "getGeoip");
// using the proxy client API
inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.FALSE);

Now all the headers and exchange properties are set properly we move our attention to the message body. By default CXFRS expects the request to be of the type, org.apache.cxf.message.MessageContentsList and although this can be change by creating a custom binding in this post we are going to stick with the MessageContentsList type.

The MessageContentsList is a name value pair type and should contain (in order) the arguments of the method in our Client Proxy API. The method signature of our Client Proxy API looks like this:

public String getGeoip(@PathParam("type") String type, @PathParam("ip") String ip);

So we need to add two Strings to our MessageContentsList: type and ip.
To do this add the following code to the process method:

String ip = inMessage.getBody(String.class);
String type = inMessage.getHeader("type",String.class);
MessageContentsList req = new MessageContentsList();
req.add(type);
req.add(ip);
inMessage.setBody(req);

In our example we get the ip address from the message body and the type out of a header. These values we will set when we implement our Camel route. It is likely that in a normal situation more than one field will be present in the body as some sort of type and a more sophisticated data mapping has to be implemented. However, in this example we are keeping it simple with a one-to-one mapping of the request body to one of the entries in the MessageContentsList.
The entire processor class looks like this:

package nl.rubix.cxfrs.test.transform;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.cxf.common.message.CxfConstants;
import org.apache.cxf.message.MessageContentsList;

public class RequestProcessor implements Processor{
	@Override 
	public void process(Exchange exchange) throws Exception {
	        exchange.setPattern(ExchangePattern.InOut);
	        Message inMessage = exchange.getIn();
	        // set the operation name
	        inMessage.setHeader(CxfConstants.OPERATION_NAME, "getGeoip");
	        // using the proxy client API
	        inMessage.setHeader(CxfConstants.CAMEL_CXF_RS_USING_HTTP_API, Boolean.FALSE);
	        
	        //creating the request
	        String ip = inMessage.getBody(String.class);
	        String type = inMessage.getHeader("type",String.class);
	        MessageContentsList req = new MessageContentsList();
	        req.add(type);
	        req.add(ip);
	        inMessage.setBody(req);

	    }
}

Now that our processor is finished we can implement the Camel route calling this processor and the actual REST endpoint.

Create a Camel route to call the CXFRS endpoint

 
We already created the Camel context and added the CXFRS endpoint to it in the step: “setup a CxfRsClient Endpoint in Camel” now we can implement the Camel route using the CxfRsClient endpoint and the processor we created earlier.
To start off the route we will just use the timer component so our route starts off automatically.

<from uri="timer://foo?period=5000"/>

Next up we need to prepare the request, so we need to set the header “type” containing either xml or json. And we need to set the body to the ip address we want to lookup in the REST API (note: I changed my personal ip address to 0.0.0.0.

<setHeader headerName="type">
  <constant>xml</constant>
</setHeader>
<setBody>
  <constant>0.0.0.0</constant>
</setBody>

Now that the request body and header are set in the Camel route we can call the processor we created in the previous step. (normally these values would not be hardcoded off course 😉 )
To call our processor we first need to declare this processor as a bean in our Blueprint xml. So outside the CamelContext xml tags add the bean declaration:

<bean id="requestProcessor" class="nl.rubix.cxfrs.test.transform.RequestProcessor"/>

After our bean is declared inside the Camel route call the processor:

<process ref="requestProcessor"/>

Now everything is setup properly to make the call to our CxFRsClientEndpoint:

<to uri="cxfrs:bean:rsClient"/>

The entire Blueprint xml file looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
       xmlns:camel="http://camel.apache.org/schema/blueprint"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:cxf="http://camel.apache.org/schema/blueprint/cxf"
       xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
       http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
       
       <bean id="requestProcessor" class="nl.rubix.cxfrs.test.transform.RequestProcessor"/>

  <camelContext trace="false" xmlns="http://camel.apache.org/schema/blueprint">
    <route>
        <from uri="timer://foo?period=5000"/>
        <log message="Starting Route with client proxy api"/>
        <setHeader headerName="type">
        	<constant>xml</constant>
        </setHeader>
        <setBody>
        	<constant>0.0.0.0</constant>
        </setBody>
        <process ref="requestProcessor"/>
        <to uri="cxfrs:bean:rsClient"/>
        <log message="${body}"/>
    </route>
</camelContext>

<cxf:rsClient id="rsClient" address="http://freegeoip.net" serviceClass="nl.rubix.cxfrs.test.endpoint.Freegeoip" loggingFeatureEnabled="true"/>
</blueprint>

Now when we run the route we see our log message (again I changed my personal ip and the coordinates in the response message to 0s):

[mel-2) thread #1 - timer://foo] LoggingOutInterceptor          INFO  Outbound Message
---------------------------
ID: 1
Address: http://freegeoip.net/xml/0.0.0.0
Http-Method: GET
Content-Type: application/xml
Headers: {Content-Type=[application/xml], Accept=[application/json, application/xml]}
--------------------------------------
[mel-2) thread #1 - timer://foo] LoggingInInterceptor           INFO  Inbound Message
----------------------------
ID: 1
Response-Code: 200
Encoding: ISO-8859-1
Content-Type: application/xml
Headers: {Access-Control-Allow-Method=[GET, HEAD, OPTIONS], Access-Control-Allow-Origin=[*], connection=[keep-alive], Content-Length=[367], content-type=[application/xml], Date=[Mon, 01 Dec 2014 11:11:05 GMT], Server=[nginx/1.4.6 (Ubuntu)], X-Database-Date=[Wed, 26 Nov 2014 13:21:26 GMT]}
Payload: <?xml version="1.0" encoding="UTF-8"?>
<Response>
	<IP>0.0.0.0</IP>
	<CountryCode>NL</CountryCode>
	<CountryName>Netherlands</CountryName>
	<RegionCode></RegionCode>
	<RegionName></RegionName>
	<City></City>
	<ZipCode></ZipCode>
	<TimeZone>Europe/Amsterdam</TimeZone>
	<Latitude>0.0</Latitude>
	<Longitude>0.0</Longitude>
	<MetroCode>0</MetroCode>
</Response>

And when we change the header type to json:

D: 1
Address: http://freegeoip.net/json/0.0.0.0
Http-Method: GET
Content-Type: application/xml
Headers: {Content-Type=[application/xml], Accept=[application/json, application/xml]}
--------------------------------------
[mel-2) thread #1 - timer://foo] LoggingInInterceptor           INFO  Inbound Message
----------------------------
ID: 1
Response-Code: 200
Encoding: ISO-8859-1
Content-Type: application/json
Headers: {Access-Control-Allow-Method=[GET, HEAD, OPTIONS], Access-Control-Allow-Origin=[*], connection=[keep-alive], Content-Length=[208], content-type=[application/json], Date=[Mon, 01 Dec 2014 11:16:03 GMT], Server=[nginx/1.4.6 (Ubuntu)], X-Database-Date=[Wed, 26 Nov 2014 13:21:26 GMT]}
Payload: {"ip":"0.0.0.0","country_code":"NL","country_name":"Netherlands","region_code":"","region_name":"","city":"","zip_code":"","time_zone":"Europe/Amsterdam","latitude":0.0,"longitude":0.0,"metro_code":0}
 
3 Comments

Posted by on 2014-12-01 in JBoss Fuse

 

Tags: , , , ,