Using Apache Camel with Azure Service Bus

We’ve been playing around with building an integration solution using components such as Docker, Azure Service Bus and Apache Camel. Since I was unable to find any real examples of using Camel with Service Bus and there are some pitfalls, I thought I might as well publish one once we got it working properly.

The complete source code for a simple Camel route reading from a Service Bus queue is available at https://github.com/KTH/integral-reader-test. There is also a runnable Docker image, see the README.

AMQP 1.0 in Camel

There are several versions of AMQP with less in common than one might expect, but beginning with Camel 2.17, the AMQP support is lifted to 1.0. Still, due to other issues I opted to force new versions of the underlying Apache Qpid library in pom.xml.

<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-jms-client</artifactId>
  <version>0.11.1</version>
</dependency>
<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-client</artifactId>
  <version>6.0.5</version>
</dependency>

There are numerous ways to set up the broker connection, and they are changing due to interface changes which makes it confusing. The AMQP 1.0 support in Service Bus is not complete and Qpid is an independent implementation, making it difficult to know what to expect when you run into problems, and where to start digging. Eventually I landed in this configuration of the amqp Camel bean. I’ll mention some of the particulars below.

<!-- Service Bus AMQP 1.0 connection --> 
<bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
  <!-- amqp.traceFrames=true turns on protocol debugging -->
  <!-- amqp.idleTimeout=120000 is minimum required by Service Bus -->
  <property name="remoteURI" value="amqps://${service_bus.uri}?amqp.idleTimeout=120000" />
  <property name="username" value="${service_bus.user}" />
  <property name="password" value="${service_bus.password}" />
  <!-- 
    Makes Service Bus connection behave reasonably. In particular this means that 
    the client is not sending drain=true packets which apparently Service Bus 
    doesn't currently support. /fjo 2016-11-18 
  -->
  <property name="receiveLocalOnly" value="true" />
</bean>
<bean id="jmsCachingConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory">
  <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration" >
  <property name="connectionFactory" ref="jmsCachingConnectionFactory" />
  <property name="cacheLevelName" value="CACHE_AUTO" />
</bean>
<bean id="amqp" class="org.apache.camel.component.amqp.AMQPComponent">
  <property name="configuration" ref="jmsConfig" />
</bean>

Walk-through of my issues

There where a number of caveats along the way and I’ll briefly touch on these.

ampq.IdleTimeout=120000

This is actually well-known on the internetz. Service Bus simply doesn’t support any lower values, in particular the default set by Qpid. Just set it.

recieveLocalOnly

This, however, I’ve not found described anywhere. I’ve forgotten exactly where, but somewhere along the line the Camel JMS code causes the client to send requests with the protocol option drain set to true. (Yes, the amqp.traceFrames=true URL parameter mentioned in the comments in camel-context.xml did come in handy some times).

I could try to explain what I believe this option does, but that would be pushing my understanding of the inner workings of the AMQP protocol a bit, so let me just say that this option doesn’t seem to sit well with Service Bus. Mostly, the broker will simply not answer, the client time out and close the connection. After a few retries the client will assume the broker broken and that’s that.

Then again, sometimes you actually do get some messages. And then it dies. It’s just… weird. Set the recieveLocalOnly option on the JMSConnectionFactory and the issue vanishes.

JMSXGroupID headers

The Camel component I used to create messages from our IDM solution is written by yours truly and at the time, due to the inner workings of the IDM server, it seemed like a good idea to set JMSXGroupID and related headers to group messages. It was a nice-to-have that we currently have no use for, but I was ambitious. So, not tested and maybe I’ve misunderstood exactly how these headers should behave, but the really, really strange thing was;

Azure Service Bus really did something with JMSXGroupId headers.

It’s not clear what, if it’s broken or my code is. But still. Service Bus do something rather undocumented if you use such headers. I couldn’t get the messages off the queue. I tore those headers out, and suddenly everything started working like a charm.

Locking

If I didn’t look stupid before, it will begin now. The default lock duration on a queue in the Azure portal is 30s. Which you cannot set. You are told that the value has to be in the 1-5 range. So I turned it down to 5s. Now, you can change those seconds to minutes instead, but I just assumed that the error meant that 5s was the maximum lock duration.

Not so. You can set it to 1-5 seconds or minutes, but not something like 30s. Perhaps something is not crystal clear here. Perhaps I’m just stupid.

Anyway. I’ve thus been working quite a bit with rather short lock durations (until someone pointed this out) and I thought I could share my experiences. From time to time, in a real world situation where you actually do something with the messages, I would fail to pull the message off the queue in time causing me to handle the message several times. Since I had a single consumer, I solved that using the Idempotent Consumer pattern available out of the box in Camel.

<bean id="messageRepo"
    class="org.apache.camel.processor.idempotent.MemoryIdempotentRepository"/>

...
  <route>
     <from uri="amqp:queue:..." />
     <idempotentConsumer messageIdRepositoryRef="messageRepo" eager="false">
       <!-- do stuff -->
     </idempotentConsumer>
  </route>
...

It works. You may not need it. But it works.

Ephemeral connections

This was no surprise, well documented and quite obvious. Service Bus connections aren’t forever, you have to expect that they close. Though in real life with queues using partition, this seem to happen far less than I would expect. When I turned partitioning off I saw more connection resets.

So you have to set up some sort of resend policy in the Camel error handler, but that is just out-of-the box configuration.

 <errorHandler id="retryErrorHandler" type="DefaultErrorHandler"
     xmlns="http://camel.apache.org/schema/spring">
   <redeliveryPolicy maximumRedeliveries="10"
       retryAttemptedLogLevel="WARN"
       backOffMultiplier="2"
       useExponentialBackOff="true"/>
 </errorHandler>

I’ve never seen more than one retry.

Heureka

So, having banged my head against the wall long enough to punch through to the other side, this really works. I’ve pushed in the order of 1-2 million messages through Service Bus with Camel producers and consumer in both real world and testing situations without a single failure since.

We did this in order to see if we could avoid running our own broker, such as ActiveMQ or RabbitMQ. And currently, I don’t see any need for it. There could be more not-.Net-based tools around Service Bus, but I think we’ll live with that and create those we need.

2 reaktion på “Using Apache Camel with Azure Service Bus

  1. Agen Online

    I really like this with your article. I understand using apache camel with zure service buss thanks for the instructions you gave. This article is really useful. I would like to allow me to put a link on my site Bola Online. Thank you for your attention.

  2. Diogo

    I would like to know if there is a way to configure the access in the route.

Kommentera

E-postadressen publiceras inte. Obligatoriska fält är märkta *