Thursday, September 25, 2008

Using AspectJ to diagnose ServiceMix component performance

An inventive FUSE customer showed me an approach to diagnosing his ServiceMix integration flow by injecting some aspect-oriented code, and it's so cool that I thought it would be worth sharing. The problem is this: given that you have an integration flow, how can you diagnose how long each endpoint in the flow is taking to do its business? The customer in particular was experiencing exceptionally high latency on his integration flow (in the order of 900ms) that just seemed plain wrong: the question was, where in the flow was he incurring the hit? Was it in transformation, EIP, resequencing, or something else?

I used AspectJ to reproduce my customer's approach, weaving in some diagnosis code into the ServiceMix call stack. You can do this by creating a simple aspect like the one below, which is called whenever a call is made to Flow.send(). The measurePeformance() method does some logging, but also does a simple timer around the message invocation. The result will only be accurate to the millisecond; however, if you're trying to isolate big-elephant-in-the-room bottlenecks then this should be sufficient.


@Aspect
public class PerformanceAdvice {

@Around("execution(* org.apache.servicemix.jbi.nmr.flow.Flow.send(..))")
public Object measurePerformance(ProceedingJoinPoint thisJoinPoint) {
Object ret = null;
try {
MessageExchange me = (MessageExchange) thisJoinPoint.getArgs()[0];
System.out.println("send() to " + me.getEndpoint().getServiceName() + "... (status: " + me.getStatus() + ", role: "
+ (me.getRole() == MessageExchange.Role.PROVIDER ? "PROVIDER" : "CONSUMER")
+ ")");
long startTime = System.currentTimeMillis();
ret = thisJoinPoint.proceed();
long endTime = System.currentTimeMillis();
System.out.println(me.getEndpoint().getServiceName() + " done; elapsed time = " + (endTime - startTime) + " ms.");


} catch (Throwable e) {
System.out.println(e.getMessage());
}
return ret;
}
}



To weave this code into ServiceMix (I used version 3.3.1.6-fuse from http://open.iona.com) was surprising easily: I added the JARs for aspectj (aspectjlib.jar, aspectjtools.jar, aspectjrt.jar, aspectjweaver.jar) into ServiceMix's lib directory. I also jarred up my PerformanceAdvice class and dropped it into the lib directory too. Finally, I modified the ServiceMix configuration file - conf/servicemix.xml - to create my advice and turn on auto-proxying for AspectJ. First, I added the Spring AOP namespace to the tag:


xmlns:aop="http://www.springframework.org/schema/aop"


Then, I added the following elements within the element:


<!-- Turn on AspectJ auto-proxying -->
<aop:aspectj-autoproxy/>
<!-- Create my performance advice aspect -->
<bean id="performanceAdvice" class="ps.progress.com.PerformanceAdvice">


And that's it! To test, I ran a simple integration flow that reads from a JMS queue and uses a pipeline to transform the message and send the result to another queue. Here's the output:

Invoking on {http://progress.com/ps/smx/demo/transformer}Pipeline... (status: Active, role: PROVIDER)
{http://progress.com/ps/smx/demo/transformer}Pipeline done; elapsed time = 0 ms.
Invoking on {http://progress.com/ps/smx/demo/transformer}XsltTransformer... (status: Active, role: PROVIDER)
{http://progress.com/ps/smx/demo/transformer}XsltTransformer done; elapsed time = 1 ms.
Invoking on {http://progress.com/ps/smx/demo/transformer}XsltTransformer... (status: Active, role: CONSUMER)
{http://progress.com/ps/smx/demo/transformer}XsltTransformer done; elapsed time = 0 ms.
Invoking on {http://progress.com/ps/smx/demo/transformer}JmsOutput... (status: Active, role: PROVIDER)
{http://progress.com/ps/smx/demo/transformer}JmsOutput done; elapsed time = 0 ms.
Invoking on {http://progress.com/ps/smx/demo/transformer}JmsOutput... (status: Done, role: CONSUMER)
{http://progress.com/ps/smx/demo/transformer}JmsOutput done; elapsed time = 1 ms.
Invoking on {http://progress.com/ps/smx/demo/transformer}XsltTransformer... (status: Done, role: PROVIDER)
{http://progress.com/ps/smx/demo/transformer}XsltTransformer done; elapsed time = 0 ms.
Invoking on {http://progress.com/ps/smx/demo/transformer}Pipeline... (status: Done, role: CONSUMER)
{http://progress.com/ps/smx/demo/transformer}Pipeline done; elapsed time = 1 ms.



You can see that there's lots of messages being sent out - we're most interested in those where the status is Active and the role is PROVIDER (you can modify the PerformanceAdvice class to just print those out if you like). From this, I can see that the call to the XsltTransformer endpoint is taking just 1ms; a nice verification that my XSLT transform is doing anything crazy.

Using an approach similar to this, my customer was able to show that his bottleneck was actually in one of his own handmade components that invoked on a back-end server using RMI - the hit was in the order of 850ms, using the lion's share of the latency. I'm not sure what he's done since then to minimize this hit, but at least we know where the problem is.

In summary: Aspect-Oriented Programming is great for weaving in cross-cutting concerns such as logging, security or transactionality into your application code - it's also a nice tool to have for performance diagnosis.

Friday, September 19, 2008

CXF, JMS & the risk of acknowledged-but-not-processed messages

May God speed Christian Schneider and his refactoring of the CXF JMS endpoints to be more configurable. I was looking into the reliability of the JMS implementation in CXF, and (from the code), I see that the JMS acknowledgement mode used by CXF is hard-coded to be Session.AUTO_ACKNOWLEDGE. So, I thought, it's not transactional - but that's ok, right? As long as the message is acknowledged after my CXF implementation code then I'm good... right?

I wanted to verify that this was the case; so, I modified the jms_queue demo to exit unmercifully in the middle CXF impl code for the greetMe() and greetMeOneWay() methods. I had hoped that this would mean that the incoming message, having been unacknowledged, would be redelivered - however, it appears that the message is not redelivered. It looks like CXF is acknowledging the message before we actually process it. Ouch.

The implications of this are serious: if you're implementing a listener for one-way (in-only) messages with CXF then there is a possibility that messages delivered just before an emergency server shutdown will have been acknowledged but not processed: lost forever. To get around this for now, I'd recommend using Camel to listen transactionally from the JMS queue and send the message to CXF for marshalling (as per Christian's article "Better JMS Transport for CXF"). That way, if the server goes down then message will be redelivered at a later stage. Of course, you should in your code check to see if the message is a redelivery, and take appropriate action to ensure that your application remains consistent.

The impact of this "early acknowledgement" on request-response services over JMS is not so fatal, as failure of the sever will result in no reply message getting to the client - a timeout will alert the client that something has gone wrong and they can resend.

Tuesday, September 9, 2008

Setting up a PostgreSQL database for ActiveMQ

The default ActiveMQ configuration file (in conf/activemq.xml) has a sample fragment of Spring configuration that shows how to use PostgreSQL as a data source. So, pointing ActiveMQ to PostgreSQL is easy; the problem you may run into is how to configure PostgreSQL in the first place with an appropriate account for ActiveMQ.

The instructions below assume that you've installed postgres in the directory /usr/local/pgsql, and that postgres is being run by the user 'postgresql'. It also assumes that you're using an bash or sh shell; however, I"m sure it's not rocket science to map what follows to DOS shell for Windows.

You should include the PostgreSQL bin directory in your path:


$ export PATH=/usr/local/pgsql/bin:$PATH


Decide on a file system location for the postgresql data files, say: /Users/postgresql/data. Set the PGDATA variable to point to this directory.


$ export PGDATA=/Users/postgresql/data


The commands that follow will use $PGDATA to determine the location of the database files.

Initialize the database data directory:


$ initdb


... This will initialize the database in the $PGDATA directory.

Now start the database controller:


$ pg_ctl -l logfile start


You should see a message 'server starting'

Now create a database called activemq-db


$ createdb activemq-db


To log into the database, use the psql command:


$ psql activemq-db


This gets you in as the current user ('postgresql') - the default security is very lax: let's enforce password protection for the database. To do this, first create a password for the postgresql user:


activemq-db=> alter user postgresql with password 'foo';


Create a username for activemq with appropriate password:


activemq-db=> create user activemq with password 'pa55w0rd';


Exit the psql shell using the \q command:


\q


Edit the $PGDATA/conf/pg_hba.conf file, to specify that the activemq user can access the activemq-db.


# TYPE DATABASE USER CIDR-ADDRESS METHOD
host all postgresql 127.0.0.1 255.255.255.255 md5
local all postgresql md5
host activemq-db activemq 127.0.0.1 255.255.255.255 md5
local activemq-db activemq md5


Now, bounce the database.


$ pg_ctl reload


You should now be prompted for a password when you log into to the database as the activemq user.


$ psql activemq-db activemq
Password for user activemq:
Welcome to psql 8.3.3, the PostgreSQL interactive terminal.

Type: \copyright for distribution terms
\h for help with SQL commands
\? for help with psql commands
\g or terminate with semicolon to execute query
\q to quit

activemq-db=>


In activemq, configure your postgres-ds data source to use the database 'activemq-db', with user 'activemq' and password 'pa55w0rd'. Now, when you start up, activemq will create the necessary tables to persist messages. You can validate that it has created these tables by using the \d command in psql:


activemq-db=> \d
List of relations
Schema | Name | Type | Owner
--------+---------------+-------+----------
public | activemq_acks | table | activemq
public | activemq_lock | table | activemq
public | activemq_msgs | table | activemq
(3 rows)


And... you're done!