RSS

Category Archives: JBoss Fuse

AES-256 message encryption in Apache Camel

This blog post shows how to encrypt and decrypt the payload of the message using Apache Camel. The cryptografic algorithm used in this example is AES-256 since this was an explicit request from security. The key used in the example was obtained from a keystore.

For extra security purposes AES encryption can be extended by using a so called Initialization Vector, which is similar as a NONCE a random number used per request. In this example a random 16 bit byte[] is used.

For more information about AES encryption: https://en.wikipedia.org/wiki/Advanced_Encryption_Standard

For more information about Initialization Vectors: https://en.wikipedia.org/wiki/Initialization_vector

To encrypt and decrypt messages Camel has made an abstraction independend of the algorithm used for encryption and decryption. This abstraction is called the CryptoDataFormat and can be used as any other data format. The CryptoDataFormat is well documented here: http://camel.apache.org/crypto.html However, the use with the AES-256 encryption algorithm is less well documented so, hopefully this post helps someone.

 

Generating the key and the keystore

The first step is to generate the key we are going to use for encryption AND decryption (remember this is symmetric encryption, meaning the same key is used for encryption als for decryption opposed to PKI which is an assymmetric encryption technique).

For generating the key we can use keytool. The example below I conviently borrowed from this blogpost: https://dzone.com/articles/aes-256-encryption-java-and

keytool -genseckey -keystore aes-keystore.jck -storetype jceks -storepass mystorepass -keyalg AES -keysize 256 -alias jceksaes -keypass mykeypass

To retrieve the key from the keystore I created some helper method, nothing special so far.

public static Key getKeyFromKeystore(KeyConfig keyConfig) {

  String keystorePass = keyConfig.getKeystorePass();
  String alias = keyConfig.getAlias();
  String keyPass = keyConfig.getKeyPass();
  String keystoreLocation = keyConfig.getKeystoreLocation();
  String keystoreType = keyConfig.getKeystoreType();

  InputStream keystoreStream = null;
  KeyStore keystore = null;
  Key key = null ;

  try {
    keystoreStream = new FileInputStream(keystoreLocation);
    keystore = KeyStore.getInstance(keystoreType);

    keystore.load(keystoreStream, keystorePass.toCharArray());
    if (!keystore.containsAlias(alias)) {
      throw new RuntimeException("Alias for key not found");
    }

    key = keystore.getKey(alias, keyPass.toCharArray());

  } catch (Exception e) {
    e.printStackTrace();
  }

  return key;
}

The KeyConfig object is just a POJO containing some String values retrieved from a propertyfile.

Since we are going to use an Initialization Vector for extra security we create a helper method for this as well, which returns a 16 bit byte array:

public static byte[] generateIV() {
  byte[] iv = new byte[16];
  new Random().nextBytes(iv);

  return iv;
}

With the helper methods in place we can turn our attention to some Camel code…

Encrypting and decrypting using a Camel Route

The first thing we need is a CryptoDataFormat. Since we are using CDI for the bootstrapping we are going to use a PostConstruct annotated method to create a CryptoDataFormat. The trick is to set the encryption algorithm to “AES/CBC/PKCS5Padding” and enter a AES-256 key.

@PostConstruct
public void setupEncryption() {
  cryptoFormat = new CryptoDataFormat("AES/CBC/PKCS5Padding", EncryptionUtils.getKeyFromKeystore(keyConfig), "SunJCE");
}

When using a CryptoDataFormat in Apache Camel we can simply use the marshal and unmarshal statements within the Camel route. However, since we are creating a message specific Initialization Vector we need to set in as well. For this we can use the helper method we created earlier. Another way would be to use a proccessor.

.setHeader("iv", method("nl.rubix.eos.poc.util.EncryptionUtils", "generateIV"))
.bean(cryptoFormat, "setInitializationVector(${header.iv})")
.marshal(cryptoFormat)

Since the example uses the same instance of the CryptoDataFormat decryption is as simple as:

.unmarshal(cryptoFormat)

In practice it won’t often be feasible to use the same intance of the CryptoDataFormat for both encryption and decrypion. When decrypting the exact same key and Initialization Vector must be used to initialize the CryptoDataFormat instance used for decryption as was used for encryption. Since this is symmetric encryption. The key used for encrypion must be available to the party which performs the decryption. This opposed to assymmetric public/private key signage. This is inherently part of the AES encryption protocol.

The complete code repo can be found here: 

java.security.InvalidKeyException: Illegal key size

For reasons unknown to mankind Oracle decided not to include key sizes of 256 in the standard JRE despite the fact we are living in the 21st century. This results in a Caused by: java.security.InvalidKeyException: Illegal key size exception. To resolve this the so called “Unlimited Strength Juristriction Policy” files must be downloaded and extracted to the JRE.

Since Oracle has a tendency to change download links it won’t be posted here. But searching the internet will provide plenty of examples on how to download and install.

Advertisements
 
Leave a comment

Posted by on 2017-07-10 in JBoss Fuse

 

Tags: , , , , , ,

Camel Split using a custom Iterator

One of the more commonly used EIP’s in Camel is the Splitter, you can find the documentation here http://camel.apache.org/splitter.html

Usually the splitter is used for tokenizing some message or splitting collections into single messages. But what if you need something more specific? It is good to know that the splitter can either split on Java collection types as well as Iterators. In this blog we are going to create a custom Iterator helper class and method. The business use of this example will be rather questionable, but since we have the opportunity to fully integrate a custom Java method the potential is limitless.

The first thing to do is create a Class and method which return an Iterator.

import javax.inject.Named;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
 * Created by pim on 6/30/17.
 */
@Named("myCustomIterator")
public class MyCustomIterator {

    public Iterator getIterator(Exchange exchange) {
        final String exampleMsg = exchange.getIn().getBody(String.class);

        if (exampleMsg == null)
            throw new NullPointerException();

        return new Iterator<Character>() {
            private int index = 0;

            public boolean hasNext() {
                return index < exampleMsg.length();
            }

            public Character next() {

                if (!hasNext())
                    throw new NoSuchElementException();
                return exampleMsg.charAt(index++);
            }

            public void remove() {
                throw new UnsupportedOperationException();
            }
        };

    }
}

Once we have our custom Iterator we can incorporate it in our Camel route and more specifically use it in our splitter.

package nl.rubix.eos.camel;

import javax.inject.Inject;
import javax.inject.Named;

import org.apache.camel.Endpoint;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.ContextName;
import org.apache.camel.cdi.Uri;

/**
 * Configures all our Camel routes, components, endpoints and beans
 */
@ContextName("myIteratorSplitter")
public class MyIteratorSplitterRoute extends RouteBuilder {

    @Inject
    @Named("myCustomIterator")
    MyCustomIterator myCustomIterator;

    @Override
    public void configure() throws Exception {
        // you can configure the route rule with Java DSL here

        from("direct:start")
            .log("${body}")
            .split().method("myCustomIterator", "getIterator")
                .log("${body}")
            .end();
    }

}

Now when we run this example with the string “camelisawesome” we see the following log entries:


2017-06-30 14:18:35,830 [main ] INFO DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://start]
2017-06-30 14:18:35,830 [main ] INFO DefaultCamelContext - Total 1 routes, of which 1 are started.
2017-06-30 14:18:35,835 [main ] INFO DefaultCamelContext - Apache Camel 2.17.0.redhat-630187 (CamelContext: myIteratorSplitter) started in 0.329 seconds
2017-06-30 14:18:35,849 [main ] INFO Bootstrap - WELD-ENV-002003: Weld SE container camel-context-cdi initialized
2017-06-30 14:18:35,877 [main ] INFO route1 - camelisawesome
2017-06-30 14:18:35,887 [main ] INFO route1 - c
2017-06-30 14:18:35,887 [main ] INFO route1 - a
2017-06-30 14:18:35,888 [main ] INFO route1 - m
2017-06-30 14:18:35,888 [main ] INFO route1 - e
2017-06-30 14:18:35,889 [main ] INFO route1 - l
2017-06-30 14:18:35,889 [main ] INFO route1 - i
2017-06-30 14:18:35,889 [main ] INFO route1 - s
2017-06-30 14:18:35,890 [main ] INFO route1 - a
2017-06-30 14:18:35,890 [main ] INFO route1 - w
2017-06-30 14:18:35,891 [main ] INFO route1 - e
2017-06-30 14:18:35,891 [main ] INFO route1 - s
2017-06-30 14:18:35,891 [main ] INFO route1 - o
2017-06-30 14:18:35,892 [main ] INFO route1 - m
2017-06-30 14:18:35,893 [main ] INFO route1 - e
2017-06-30 14:18:35,898 [main ] INFO CamelContextProducer - Camel CDI is stopping Camel context [myIteratorSplitter]

Using the custom Iterator enables you to create a custom splitter.

 
Leave a comment

Posted by on 2017-06-30 in JBoss Fuse

 

Tags: , , ,

Playing around with Camel AsyncProcessor

One of the most frequently used constructs in Apache Camel is the Processor (http://camel.apache.org/processor.html), it is used ofter for invoking custom code or performing message translations. The API of the processor is very clear and well documented. As are numerous examples available for using a Camel Processor. The lesser known brother of the Processor is the AsyncProcessor (http://camel.apache.org/asynchronous-processing.html) which is less documented and a less frequently used. Mainly because the AsyncProcessor is mainly targeted for Camel Component developers. However, recently I decided to play around with the Camel AsyncProcessor in a regular Camel setup. In this blog I would like to explain one possible way how to use the AsyncProcessor in a Camel route setup.

Creating a AsyncProcessor

Similar to creating a Processor, creating a AsyncProcessor starts by implementing the AsyncProcessor interface.


public class MyAsyncProcessor implements AsyncProcessor {

But instead of one process method, now two must be implemented:


@Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {

@Override public void process(Exchange exchange) {

Next to the regular process method  process method which takes an AsyncCallback must be implemented. The AsyncCallback is invoked whenever the Async execution (which must be started in a seperate thread) is finished. The return boolean indicates whether or not the Camel routing engine must wait or continue routing to other components/processors defined in the Camel route.

Starting the Async job

In order to start a async job the execution must take place in another thread. Java nowadays has multiple ways for concurrent, multi threaded execution. For this example we simply create a Runnable class en start the job via een executor service.


private class AsyncBackgroundProcess implements Runnable {
  private Exchange exchange;
  private AsyncCallback asyncCallback;
  public AsyncBackgroundProcess(Exchange exchange, AsyncCallback asyncCallback){
    this.exchange = exchange;
    this.asyncCallback = asyncCallback;
  }
  @Override public void run() {
    log.info("Async backend process started");
    Boolean getBoolean = slowExecutionInterface.getBoolean("bla");
    exchange.setProperty("Response", getBoolean);
    log.info("Async backend process completed");
    asyncCallback.done(false);
  }
}

There are two things to note here:

  1. The AsyncCallback object from the process method is passed to the Runnable class
  2. When the execution is finished the asyncCallback.done method is called
    1. The false parameter indicates if the execution is handled synchronously (true) or asynchronously (false)

Implementing the AsyncProcessor process method

In the Async Processor process method we kickoff the Runnable class and define the callback method:


@Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {
  log.info("Async process started");
  CountDownLatch countDownLatch = new CountDownLatch(1);
  exchange.setProperty("countDownLatch", countDownLatch);
  executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
    @Override public void done(boolean b) {
      log.info("Async backend process fininshed");
      exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
    }
  }));
  return true;
}

Using the AsyncProcessor in a Camel route

Using a AsyncProcessor in a Camel route is exactly the same as using a regular Processor in the route:


from(jettyEndpoint)
.log("received")
.process(myAsyncProcessor)

Getting a response

By default the AsyncProcessor triggers a new thread for execution and does not sync back to the main execution thread. So getting a response in a “Fork-Join” manner requires some additional work. In the implementation of the process method above some actions for getting a response are already present:

A CountDownLatch is used for defining the number of threads the main thread can wait for (in our case 1):


CountDownLatch countDownLatch = new CountDownLatch(1);

in order to use the CountDownLatch downstream in our Camel route we save the it to an exchange property:


exchange.setProperty("countDownLatch", countDownLatch);

in the trigger for the Runnable class in a new thread we define the AsyncCallback and its actions when the done method is invoked, to count down the CountDownLatch indicating the background thread is finished:


executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
    @Override public void done(boolean b) {
      log.info("Async backend process fininshed");
      exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
    }
  }));

But this does not synchronize our threads just that. In order to get a response in our Camel route or Processor the execution must wait at some point in time for the background thread to complete. For this a helper method was created:


public static void getResponseInBody(Exchange exchange) {
  CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
  exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
  log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
  exchange.getIn().setBody(exchange.getProperty("Response", String.class));
}

In this helper method the CountDownLatch is used from the exchange and the Camel AwaitManager is used for the thread synchronization:

CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);

Since this helper method is static, it can be invoked from anywhere in the route or processor, thereby giving the flexibility where in the route the threads must be synchronized.


@Override
public void configure() throws Exception {
from(jettyEndpoint)
.log("received")
.process(myAsyncProcessor)
.log("exited processor")
.bean(MyAsyncProcessor.class, "getResponse(${exchange})")
.log("${body} and Response Property ${property.Response}");
}

The entire AsyncProcessor looks like this:


package nl.rubix.eos.poc.asyncprocessor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Named("myAsyncProcessor")
public class MyAsyncProcessor implements AsyncProcessor {
  private final ExecutorService executorService = Executors.newFixedThreadPool(2);
  private static Logger log = LoggerFactory.getLogger(MyAsyncProcessor.class);
  @Inject
  @Named("slowExecutionImpl")
  private SlowExecutionInterface slowExecutionInterface;
  @Override public boolean process(final Exchange exchange, AsyncCallback asyncCallback) {
    log.info("Async process started");
    CountDownLatch countDownLatch = new CountDownLatch(1);
    exchange.setProperty("countDownLatch", countDownLatch);
    executorService.submit(new AsyncBackgroundProcess(exchange, new AsyncCallback() {
      @Override public void done(boolean b) {
        log.info("Async backend process fininshed");
        exchange.getContext().getAsyncProcessorAwaitManager().countDown(exchange, exchange.getProperty("countDownLatch", CountDownLatch.class));
      }
    }));
    return true;
  }
  @Override public void process(Exchange exchange) throws Exception {
    throw new IllegalStateException("Should never be called");
  }
  private class AsyncBackgroundProcess implements Runnable {
    private Exchange exchange;
    private AsyncCallback asyncCallback;
    public AsyncBackgroundProcess(Exchange exchange, AsyncCallback asyncCallback){
      this.exchange = exchange;
      this.asyncCallback = asyncCallback;
    }
    @Override public void run() {
      log.info("Async backend process started");
      Boolean getBoolean = slowExecutionInterface.getBoolean("bla");
      exchange.setProperty("Response", getBoolean);
      log.info("Async backend process completed");
      asyncCallback.done(false);
    }
  }
  public static void getResponseInBody(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
    exchange.getIn().setBody(exchange.getProperty("Response", String.class));
  }
  public static void getResponseInProperty(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
  }
  public static Boolean getResponse(Exchange exchange) {
    CountDownLatch countDownLatch = exchange.getProperty("countDownLatch", CountDownLatch.class);
    exchange.getContext().getAsyncProcessorAwaitManager().await(exchange, countDownLatch);
    log.debug("Retrieved async response " + exchange.getProperty("Response", String.class));
    return exchange.getProperty("Response", Boolean.class);
  }
}

The entire Camel route looks like this:


package nl.rubix.api.poc;
import nl.rubix.eos.poc.asyncprocessor.MyAsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.cdi.ContextName;
import org.apache.camel.cdi.Uri;
import javax.inject.Inject;
import javax.inject.Named;
/**
 * Configures all our Camel routes, components, endpoints and beans
 */
@ContextName("myJettyCamel")
public class MyJettyRoute extends RouteBuilder {
    @Inject @Uri("jetty:http://0.0.0.0:8080/async/test")
    private Endpoint jettyEndpoint;
    @Inject
    @Named("myAsyncProcessor")
    MyAsyncProcessor myAsyncProcessor;
    @Override
    public void configure() throws Exception {
        from(jettyEndpoint)
            .log("received")
            .process(myAsyncProcessor)
            .log("exited processor")
            .bean(MyAsyncProcessor.class, "getResponse(${exchange})")
            .log("${body} and Response Property ${property.Response}");
    }
}

To simulate slow execution a simple interface and corresponding implementation where used:


package nl.rubix.eos.poc.asyncprocessor;
public interface SlowExecutionInterface {
  Boolean getBoolean(String input);
}

package nl.rubix.eos.poc.asyncprocessor;
import javax.inject.Named;
@Named("slowExecutionImpl")
public class SlowExecutionInterfaceMock implements SlowExecutionInterface {
  @Override public Boolean getBoolean(String input) {
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return true;
  }
}

 
Leave a comment

Posted by on 2017-05-26 in JBoss Fuse

 

Tags: , , , , ,

Apache Camel – Dynamic redelivery based on MEP

The exception handling and retry mechanisms in Apache Camel are quite extensive. In this blogpost we are going to take a look at customizing the retry based on a predicate implementation of our own thereby enabling really fine grained retry logic.
In our example we are going to look at the MEP of the exchange, whenever it is inOnly we are going to retry since no synchronous subscriber is waiting for the response. Conversely, when the MEP of the exchange is inOut we are not going to retry and immediately send back an error. Think fail fast en stuff.
Like mentioned above we can enable this by implementing the Predicate interface from Apache Camel.
The entire class looks like this:
package nl.schiphol.api.integration.redelivery;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Predicate;
import org.apache.camel.component.properties.PropertiesComponent;

import javax.inject.Inject;
import javax.inject.Named;

@Named
public class InvokeEndpointsRedeliveryPolicy implements Predicate{

 @Inject
 PropertiesComponent properties;

 private Integer MAX_REDELIVERIES;

 /**
 * Returns a Boolean whether or not a redelivery must be executed.
 * Whether or not retries should be executed are dependend on two criteria: the MEP of the exchange and the max retries
 * If the MEP is inOnly a response is not required immediately and retries can commense up until the max retries
 * @implements org.apache.camel.Predicate
 * @param exchange The Camel Exchange is used as a parameter, this is dictated by the Predicate interface
 * @return Boolean if a redelivery should be executed.
 */
 @Override
 public boolean matches(Exchange exchange) {
   setMaxRedeliveries();
   //no redelivery is the default
   Boolean shouldRedeliver = false;

   ExchangePattern exchangePattern = exchange.getPattern();
   Boolean isInOut = exchangePattern.isOutCapable();
   Integer redeliveryCounter = getRedeliveryCounter(exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class));

   if(!isInOut && redeliveryCounter < MAX_REDELIVERIES){
    shouldRedeliver = true;
   }

   return shouldRedeliver;
 }

 private void setMaxRedeliveries() {
   try {
   String retryAmountProperty = properties.parseUri("{{health-api.invokeEndpoints.retryAmount}}");
   MAX_REDELIVERIES = new Integer(retryAmountProperty);
   } catch (Exception e) {
     MAX_REDELIVERIES = 3;
   }
 }

 private Integer getRedeliveryCounter(Integer redeliveryCounterHeader){
   Integer redeliveryCounter = redeliveryCounterHeader;

   if(redeliveryCounter == null){
    redeliveryCounter = 0;
   }

   return redeliveryCounter;
 }

 private Boolean getRetryProperty(Boolean retryProperty) {
   Boolean retry = retryProperty;

   if(retry == null){
     retry = false;
   }

   return retry;
 }
}
In order to use our predicate in our Camel Route for determining the retry use the ‘retryWhile’ statement:
@Override
private void configure() {
 from("direct:start").routeId("my-dynamic-retry-route")
 .onException(Exception.class).redeliveryDelay(1500).retryWhile(redeliveryPolicy).continued(true).end()
 ...
 
Leave a comment

Posted by on 2017-04-06 in JBoss Fuse

 

Tags: , , , , , , ,

Configuring a Network of Brokers in Fuse Fabric

In ActiveMQ it is possible to define logical groups of message brokers in order to obtain more resiliency or throughput.
The setup configuration described here can be outlined as follows:
Network_of_brokers_layout
Creating broker groups and a network of brokers can be done in various manners in JBoss Fuse Fabric. Here we are going to use the Fabric CLI.
The following steps are necessary to create the configuration above:

  1. Creating a Fabric (if we don’t already have one)

  2. Create child containers

  3. Create the MQ broker profiles and assign them to the child containers

  4. Connect a couple of clients for testing

1. Creating a Fabric (optional)

Assuming we start with a clean Fuse installation the first step is creating a Fabric. This step can be skipped if a fabric is already available.

In the Fuse CLI execute the following command:

JBossFuse:karaf@root> fabric:create -p fabric --wait-for-provisioning

2. Create child containers

Next we are going to create two sets of child containers which are going to host our brokers. Note, the child containers we are going to create in this step are empty child containers and do not yet contain AMQ brokers. We are going to provision these containers with AMQ brokers in step 3.

First create the child containers for siteA:

JBossFuse:karaf@root> fabric:container-create-child root site-a 2

Next create the child containers for siteB:

JBossFuse:karaf@root> fabric:container-create-child root site-b 2

3. Create the MQ broker profiles and assign them to the child containers

In this step we are going to create the broker profiles in fabric and assign them to the containers we created in the previous step.


JBossFuse:karaf@root> fabric:mq-create --group site-a --networks site-b --networks-username admin --networks-password admin --assign-container site-a1,site-a2 site-a-profile

JBossFuse:karaf@root> fabric:mq-create --group site-b --networks site-a --networks-username admin --networks-password admin --assign-container site-b1,site-b2 site-b-profile

The fabric:mq-create command creates a broker profile in Fuse Fabric. The –group flag assigns a group to the brokers in the profile. The networks flag creates the required network connection needed for a network of brokers. In the assign-container flag we assign this newly created broker profile to one or more containers.

4.Connect a couple of clients for testing

A sample project containing two clients, one producer and one consumer is available on github.

Clone the repository:

$ git clone https://github.com/pimg/mq-fabric-client.git

build the project:

$ mvn clean install

Start the message consumer (in the java-consumer directory):

$ mvn exec:java

Start the message producer (in the java-producer directory):

$ mvn exec:java

Observe the console logging of the producer:

14:48:58 INFO  Using local ZKClient
14:48:58 INFO  Starting
14:48:58 INFO  Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
14:48:58 INFO  Client environment:host.name=pim-XPS-15-9530
14:48:58 INFO  Client environment:java.version=1.8.0_77
14:48:58 INFO  Client environment:java.vendor=Oracle Corporation
14:48:58 INFO  Client environment:java.home=/home/pim/apps/jdk1.8.0_77/jre
14:48:58 INFO  Client environment:java.class.path=/home/pim/apps/apache-maven-3.3.9/boot/plexus-classworlds-2.5.2.jar
14:48:58 INFO  Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
14:48:58 INFO  Client environment:java.io.tmpdir=/tmp
14:48:58 INFO  Client environment:java.compiler=<NA>
14:48:58 INFO  Client environment:os.name=Linux
14:48:58 INFO  Client environment:os.arch=amd64
14:48:58 INFO  Client environment:os.version=4.2.0-34-generic
14:48:58 INFO  Client environment:user.name=pim
14:48:58 INFO  Client environment:user.home=/home/pim
14:48:58 INFO  Client environment:user.dir=/home/pim/workspace/mq-fabric/java-producer
14:48:58 INFO  Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@47e011e3
14:48:58 INFO  Opening socket connection to server localhost/127.0.0.1:2181
14:48:58 INFO  Socket connection established to localhost/127.0.0.1:2181, initiating session
14:48:58 INFO  Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x154620540e80009, negotiated timeout = 40000
14:48:58 INFO  State change: CONNECTED
14:48:59 INFO  Adding new broker connection URL: tcp://10.0.3.1:38417
14:49:00 INFO  Successfully connected to tcp://10.0.3.1:38417
14:49:00 INFO  Sending to destination: queue://fabric.simple this text: 1. message sent
14:49:00 INFO  Sending to destination: queue://fabric.simple this text: 2. message sent
14:49:01 INFO  Sending to destination: queue://fabric.simple this text: 3. message sent
14:49:01 INFO  Sending to destination: queue://fabric.simple this text: 4. message sent
14:49:02 INFO  Sending to destination: queue://fabric.simple this text: 5. message sent
14:49:02 INFO  Sending to destination: queue://fabric.simple this text: 6. message sent
14:49:03 INFO  Sending to destination: queue://fabric.simple this text: 7. message sent
14:49:03 INFO  Sending to destination: queue://fabric.simple this text: 8. message sent
14:49:04 INFO  Sending to destination: queue://fabric.simple this text: 9. message sent

Observe the console logging of the consumer:

14:48:20 INFO  Using local ZKClient
14:48:20 INFO  Starting
14:48:20 INFO  Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
14:48:20 INFO  Client environment:host.name=pim-XPS-15-9530
14:48:20 INFO  Client environment:java.version=1.8.0_77
14:48:20 INFO  Client environment:java.vendor=Oracle Corporation
14:48:20 INFO  Client environment:java.home=/home/pim/apps/jdk1.8.0_77/jre
14:48:20 INFO  Client environment:java.class.path=/home/pim/apps/apache-maven-3.3.9/boot/plexus-classworlds-2.5.2.jar
14:48:20 INFO  Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
14:48:20 INFO  Client environment:java.io.tmpdir=/tmp
14:48:20 INFO  Client environment:java.compiler=<NA>
14:48:20 INFO  Client environment:os.name=Linux
14:48:20 INFO  Client environment:os.arch=amd64
14:48:20 INFO  Client environment:os.version=4.2.0-34-generic
14:48:20 INFO  Client environment:user.name=pim
14:48:20 INFO  Client environment:user.home=/home/pim
14:48:20 INFO  Client environment:user.dir=/home/pim/workspace/mq-fabric/java-consumer
14:48:20 INFO  Initiating client connection, connectString=localhost:2181 sessionTimeout=60000 watcher=org.apache.curator.ConnectionState@3d732a14
14:48:20 INFO  Opening socket connection to server localhost/127.0.0.1:2181
14:48:20 INFO  Socket connection established to localhost/127.0.0.1:2181, initiating session
14:48:20 INFO  Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x154620540e80008, negotiated timeout = 40000
14:48:20 INFO  State change: CONNECTED
14:48:21 INFO  Adding new broker connection URL: tcp://10.0.3.1:38417
14:48:21 INFO  Successfully connected to tcp://10.0.3.1:38417
14:48:21 INFO  Start consuming messages from queue://fabric.simple with 120000ms timeout
14:49:00 INFO  Got 1. message: 1. message sent
14:49:00 INFO  Got 2. message: 2. message sent
14:49:01 INFO  Got 3. message: 3. message sent
14:49:01 INFO  Got 4. message: 4. message sent
14:49:02 INFO  Got 5. message: 5. message sent
14:49:02 INFO  Got 6. message: 6. message sent
14:49:03 INFO  Got 7. message: 7. message sent
14:49:03 INFO  Got 8. message: 8. message sent
14:49:04 INFO  Got 9. message: 9. message sent
 
Leave a comment

Posted by on 2016-12-02 in JBoss Fuse

 

Tags: , , , , ,

JBoss Fuse Jolokia requests

Even though Hawtio is a pretty awesome console for JBoss Fuse, sometimes you want or even need to have the underlying data Hawtio uses. Maybe you want to incorporate it in your existing dashboard, maybe you want to store it in a database so you have a historical record, or maybe you’re just curious and want to experiment with it.

The nice thing of JBoss Fuse and really the underlying community products, mainly Apache Camel and Apache ActiveMQ, is that they expose a lot of metrics and statistics via JMX. Now you could start your favorite IDE or text editor and write a slab of Java code to query the JMX metrics from Apache Camel and Apache ActiveMQ, like some sort of maniac. Or you could leverage the out of the box Jolokia features, like the people from Hawtio did, and really any sane person would do.

For those of you who never heard of Jolokia, in a nutshell its basically JMX over http/json. Which makes doing JMX almost fun again.

Now to start things off here are a couple Jolokia requests (and responses) to start you off. These are just plain http requests, do note they use basic authentication to authenticate against the JBoss Fuse server.

To start things of some statistics on the JVM:

The request for querying the heapsize of the JVM JBoss Fuse is running on:

http://localhost:8181/hawtio/jolokia/read/java.lang:type=Memory/HeapMemoryUsage

Which returns, on my local test machine, the following response:

{
   "request":{
      "mbean":"java.lang:type=Memory",
      "attribute":"HeapMemoryUsage",
      "type":"read"
   },
   "value":{
      "init":536870912,
      "committed":749207552,
      "max":954728448,
      "used":118298056
   },
   "timestamp":1469035168,
   "status":200
}

Now we know how to query the Heap memory, fetching the NonHeap is pretty trivial:

http://localhost:8181/hawtio/jolokia/read/java.lang:type=Memory/NonHeapMemoryUsage

{
   "request":{
      "mbean":"java.lang:type=Memory",
      "attribute":"NonHeapMemoryUsage",
      "type":"read"
   },
   "value":{
      "init":2555904,
      "committed":120848384,
      "max":-1,
      "used":110152704
   },
   "timestamp":1469035298,
   "status":200
}

Some additional threading statistics for the JVM:

http://localhost:8181/hawtio/jolokia/read/java.lang:type=Threading

which returns:

{
   "request":{
      "mbean":"java.lang:type=Threading",
      "type":"read"
   },
   "value":{
      "ThreadAllocatedMemorySupported":true,
      "ThreadContentionMonitoringEnabled":false,
      "TotalStartedThreadCount":125,
      "CurrentThreadCpuTimeSupported":true,
      "CurrentThreadUserTime":610000000,
      "PeakThreadCount":108,
      "AllThreadIds":[
         150,
         45,
         123,
         122,
         121,
         120,
         119,
         118,
         117,
         116,
         115,
         113,
         111,
         110,
         109,
         107,
         106,
         105,
         104,
         103,
         102,
         101,
         100,
         99,
         98,
         97,
         96,
         95,
         94,
         93,
         92,
         90,
         87,
         86,
         84,
         83,
         82,
         81,
         80,
         79,
         78,
         77,
         76,
         74,
         73,
         72,
         71,
         70,
         69,
         68,
         67,
         65,
         62,
         61,
         60,
         58,
         57,
         55,
         54,
         53,
         51,
         47,
         46,
         44,
         42,
         40,
         39,
         38,
         37,
         36,
         35,
         34,
         33,
         32,
         31,
         30,
         28,
         27,
         26,
         24,
         25,
         21,
         20,
         23,
         22,
         19,
         18,
         15,
         14,
         13,
         12,
         11,
         4,
         3,
         2,
         1
      ],
      "ThreadAllocatedMemoryEnabled":true,
      "CurrentThreadCpuTime":624340113,
      "ObjectName":{
         "objectName":"java.lang:type=Threading"
      },
      "ThreadContentionMonitoringSupported":true,
      "ThreadCpuTimeSupported":true,
      "ThreadCount":96,
      "ThreadCpuTimeEnabled":true,
      "ObjectMonitorUsageSupported":true,
      "SynchronizerUsageSupported":true,
      "DaemonThreadCount":70
   },
   "timestamp":1469035389,
   "status":200
}

Now for some JBoss Fuse specific queries:

First some overall statistics for our ActiveMQ broker:

http://localhost:8181/hawtio/jolokia/read/org.apache.activemq:type=Broker,brokerName=amq

Which returns:

{
   "request":{
      "mbean":"org.apache.activemq:brokerName=amq,type=Broker",
      "type":"read"
   },
   "value":{
      "StatisticsEnabled":true,
      "TotalConnectionsCount":0,
      "StompSslURL":"",
      "TransportConnectors":{
         "openwire":"tcp:\/\/localhost:61616",
         "stomp":"stomp+nio:\/\/pim-XPS-15-9530:61613"
      },
      "StompURL":"",
      "TotalProducerCount":0,
      "CurrentConnectionsCount":0,
      "TopicProducers":[

      ],
      "JMSJobScheduler":null,
      "UptimeMillis":461929,
      "TemporaryQueueProducers":[

      ],
      "TotalDequeueCount":0,
      "JobSchedulerStorePercentUsage":0,
      "DurableTopicSubscribers":[

      ],
      "QueueSubscribers":[

      ],
      "AverageMessageSize":1024,
      "BrokerVersion":"5.11.0.redhat-621084",
      "TemporaryQueues":[

      ],
      "BrokerName":"amq",
      "MinMessageSize":1024,
      "DynamicDestinationProducers":[

      ],
      "OpenWireURL":"tcp:\/\/localhost:61616",
      "TemporaryTopics":[

      ],
      "JobSchedulerStoreLimit":0,
      "TotalConsumerCount":0,
      "MaxMessageSize":1024,
      "TotalMessageCount":0,
      "TempPercentUsage":0,
      "TemporaryQueueSubscribers":[

      ],
      "MemoryPercentUsage":0,
      "SslURL":"",
      "InactiveDurableTopicSubscribers":[

      ],
      "StoreLimit":10737418240,
      "QueueProducers":[

      ],
      "VMURL":"vm:\/\/amq",
      "TemporaryTopicProducers":[

      ],
      "Topics":[
         {
            "objectName":"org.apache.activemq:brokerName=amq,destinationName=ActiveMQ.Advisory.MasterBroker,destinationType=Topic,type=Broker"
         }
      ],
      "Uptime":"7 minutes",
      "BrokerId":"ID:pim-XPS-15-9530-35535-1469035012806-0:1",
      "DataDirectory":"\/home\/pim\/apps\/jboss-fuse-6.2.1.redhat-084\/data\/amq",
      "Persistent":true,
      "TopicSubscribers":[

      ],
      "MemoryLimit":668309914,
      "Slave":false,
      "TotalEnqueueCount":1,
      "TempLimit":5368709120,
      "TemporaryTopicSubscribers":[

      ],
      "StorePercentUsage":0,
      "Queues":[

      ]
   },
   "timestamp":1469035474,
   "status":200
}

We can also zoom into a specific JMS destination on our broker, in this case the Queue named testQueue:

http://localhost:8181/hawtio/jolokia/read/org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName=testQueue

{
   "request":{
      "mbean":"org.apache.activemq:brokerName=amq,destinationName=testQueue,destinationType=Queue,type=Broker",
      "type":"read"
   },
   "value":{
      "ProducerFlowControl":true,
      "AlwaysRetroactive":false,
      "Options":"",
      "MemoryUsageByteCount":0,
      "AverageBlockedTime":0.0,
      "MemoryPercentUsage":0,
      "CursorMemoryUsage":0,
      "InFlightCount":0,
      "Subscriptions":[

      ],
      "CacheEnabled":true,
      "ForwardCount":0,
      "DLQ":false,
      "AverageEnqueueTime":0.0,
      "Name":"testQueue",
      "MaxAuditDepth":10000,
      "BlockedSends":0,
      "TotalBlockedTime":0,
      "MaxPageSize":200,
      "QueueSize":0,
      "PrioritizedMessages":false,
      "MemoryUsagePortion":0.0,
      "Paused":false,
      "EnqueueCount":0,
      "MessageGroups":{

      },
      "ConsumerCount":0,
      "AverageMessageSize":0,
      "CursorFull":false,
      "MaxProducersToAudit":64,
      "ExpiredCount":0,
      "CursorPercentUsage":0,
      "MinEnqueueTime":0,
      "MinMessageSize":0,
      "MemoryLimit":1048576,
      "DispatchCount":0,
      "MaxEnqueueTime":0,
      "BlockedProducerWarningInterval":30000,
      "DequeueCount":0,
      "ProducerCount":0,
      "MessageGroupType":"cached",
      "MaxMessageSize":0,
      "UseCache":true,
      "SlowConsumerStrategy":null
   },
   "timestamp":1469035582,
   "status":200
}

And next to ActiveMQ we can also retrieve some Camel statistics:

http://localhost:8181/hawtio/jolokia/read/org.apache.camel:context=*,type=routes,name=*/Load01,InflightExchanges,LastProcessingTime,ExchangesFailed

Which returns the metrics: “Load01”, “InflightExchanges”, “LastProcessingTime” and “ExchangesFailed” from all Camel routes in all Camel Contexts:

{
   "request":{
      "mbean":"org.apache.camel:context=*,name=*,type=routes",
      "attribute":[
         "Load01",
         "InflightExchanges",
         "LastProcessingTime",
         "ExchangesFailed"
      ],
      "type":"read"
   },
   "value":{
      "org.apache.camel:context=camel-blueprint,name=\"route1\",type=routes":{
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "InflightExchanges":0,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route3\",type=routes":{
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "InflightExchanges":0,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route2\",type=routes":{
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "InflightExchanges":0,
         "Load01":""
      }
   },
   "timestamp":1469035650,
   "status":200
}

We can off course also remove some filters to get all available statisitics on the Camel routes:

http://localhost:8181/hawtio/jolokia/read/org.apache.camel:context=*,type=routes,name=*

which returns a lot more statistics:

{
   "request":{
      "mbean":"org.apache.camel:context=*,name=*,type=routes",
      "type":"read"
   },
   "value":{
      "org.apache.camel:context=camel-blueprint,name=\"route1\",type=routes":{
         "StatisticsEnabled":true,
         "EndpointUri":"rest:\/\/get:\/user:\/%7Bid%7D?description=Find+user+by+id&outType=nl.schiphol.my.camel.swagger.example.User&routeId=route1",
         "CamelManagementName":"camel-blueprint",
         "ExchangesCompleted":0,
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "Description":null,
         "FirstExchangeCompletedExchangeId":null,
         "FirstExchangeCompletedTimestamp":null,
         "LastExchangeFailureTimestamp":null,
         "MaxProcessingTime":0,
         "LastExchangeCompletedTimestamp":null,
         "Load15":"",
         "DeltaProcessingTime":0,
         "OldestInflightDuration":null,
         "ExternalRedeliveries":0,
         "ExchangesTotal":0,
         "ResetTimestamp":"2016-07-20T19:16:53+02:00",
         "ExchangesInflight":0,
         "MeanProcessingTime":0,
         "LastExchangeFailureExchangeId":null,
         "FirstExchangeFailureExchangeId":null,
         "CamelId":"myCamel",
         "TotalProcessingTime":0,
         "FirstExchangeFailureTimestamp":null,
         "RouteId":"route1",
         "RoutePolicyList":"",
         "FailuresHandled":0,
         "MessageHistory":true,
         "Load05":"",
         "OldestInflightExchangeId":null,
         "State":"Started",
         "InflightExchanges":0,
         "Redeliveries":0,
         "MinProcessingTime":0,
         "LastExchangeCompletedExchangeId":null,
         "Tracing":false,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route3\",type=routes":{
         "StatisticsEnabled":true,
         "EndpointUri":"rest:\/\/get:\/user:\/findAll?description=Find+all+users&outType=nl.schiphol.my.camel.swagger.example.User%5B%5D&routeId=route3",
         "CamelManagementName":"camel-blueprint",
         "ExchangesCompleted":0,
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "Description":null,
         "FirstExchangeCompletedExchangeId":null,
         "FirstExchangeCompletedTimestamp":null,
         "LastExchangeFailureTimestamp":null,
         "MaxProcessingTime":0,
         "LastExchangeCompletedTimestamp":null,
         "Load15":"",
         "DeltaProcessingTime":0,
         "OldestInflightDuration":null,
         "ExternalRedeliveries":0,
         "ExchangesTotal":0,
         "ResetTimestamp":"2016-07-20T19:16:53+02:00",
         "ExchangesInflight":0,
         "MeanProcessingTime":0,
         "LastExchangeFailureExchangeId":null,
         "FirstExchangeFailureExchangeId":null,
         "CamelId":"myCamel",
         "TotalProcessingTime":0,
         "FirstExchangeFailureTimestamp":null,
         "RouteId":"route3",
         "RoutePolicyList":"",
         "FailuresHandled":0,
         "MessageHistory":true,
         "Load05":"",
         "OldestInflightExchangeId":null,
         "State":"Started",
         "InflightExchanges":0,
         "Redeliveries":0,
         "MinProcessingTime":0,
         "LastExchangeCompletedExchangeId":null,
         "Tracing":false,
         "Load01":""
      },
      "org.apache.camel:context=camel-blueprint,name=\"route2\",type=routes":{
         "StatisticsEnabled":true,
         "EndpointUri":"rest:\/\/put:\/user?description=Updates+or+create+a+user&inType=nl.schiphol.my.camel.swagger.example.User&routeId=route2",
         "CamelManagementName":"camel-blueprint",
         "ExchangesCompleted":0,
         "LastProcessingTime":0,
         "ExchangesFailed":0,
         "Description":null,
         "FirstExchangeCompletedExchangeId":null,
         "FirstExchangeCompletedTimestamp":null,
         "LastExchangeFailureTimestamp":null,
         "MaxProcessingTime":0,
         "LastExchangeCompletedTimestamp":null,
         "Load15":"",
         "DeltaProcessingTime":0,
         "OldestInflightDuration":null,
         "ExternalRedeliveries":0,
         "ExchangesTotal":0,
         "ResetTimestamp":"2016-07-20T19:16:53+02:00",
         "ExchangesInflight":0,
         "MeanProcessingTime":0,
         "LastExchangeFailureExchangeId":null,
         "FirstExchangeFailureExchangeId":null,
         "CamelId":"myCamel",
         "TotalProcessingTime":0,
         "FirstExchangeFailureTimestamp":null,
         "RouteId":"route2",
         "RoutePolicyList":"",
         "FailuresHandled":0,
         "MessageHistory":true,
         "Load05":"",
         "OldestInflightExchangeId":null,
         "State":"Started",
         "InflightExchanges":0,
         "Redeliveries":0,
         "MinProcessingTime":0,
         "LastExchangeCompletedExchangeId":null,
         "Tracing":false,
         "Load01":""
      }
   },
   "timestamp":1469035760,
   "status":200
}

Now there are probably tens or hundreds more queries to think of, but hopefully this will start you off in the right direction.

 
Leave a comment

Posted by on 2016-07-20 in JBoss Fuse

 

Tags: , , , , , ,

Instantiating a java.util.Properties via OSGi Blueprint

I was recently struggling a bit to instantiate a java.util.Properties object via OSGi Blueprint. Normally when dealing with properties in OSGi Blueprint I use the OSGi Admin config or Compendium service. However one of the other objects I needed required a java.util.Properties object as a constructor argument. So I needed to instantiate it via Blueprint. After some try & error I found the solution.


<bean id="myPropertiesObject" class="java.util.Properties">
    <argument>
      <props>
        <prop key="property1">value1</prop>
        <prop key="property2">value1</prop>
        <prop key="property3">value1</prop>
      </props>
    </argument>
  </bean>
 
2 Comments

Posted by on 2016-03-28 in Geen categorie, JBoss Fuse

 

Tags: , ,