Showing posts with label Event Notification. Show all posts
Showing posts with label Event Notification. Show all posts

Friday, December 31, 2010

Remote Observer Pattern with Publish-Subcribe via XMPP [Apache Camel Series]



Apache Camel is a powerful message routing framework that can be used to implement all common enterprise integration patterns.

I'm going to look at Remote Observer Pattern, which is understandable complex to implement using traditional techniques.

However, with Apache Camel, it becomes surprisingly simple!

Reviewing the API

I'll just expand upon the previously blogged Asynchronous Observer/Listener with Apache Camel.

Here's the Listener interface, which is still the same:

public interface InvoiceListener {
public void invoiceCreated(int id, String name);
}

Still the same exact old Listener API, to prove that Apache Camel can be unobtrusive if you want. (yes, I want it to "stay out of my API way". Let Camel be the glue.)

And the same way to call the Observer :

invoiceListener.invoiceCreated(243, "Sumba Enterprise");

Publish-Subscribe Pattern via XMPP

What I want to do is route the listener invocation from the caller through a message queue channel/topic.

Interested observers can subscribe to that topic to receive (aka "consume" in Camel-speak) events/notifications/messages.

This is called Publish-Subscribe in Enterprise Integration Patterns.
The usual way to do this would be to use JMS and a message queue server/broker such as Apache ActiveMQ.

But it doesn't have to be. Besides if you use JMS directly you'll be tied to JMS API.

With Apache Camel you're free to change the underlying routing as you wish (XMPP, JMS, direct call, etc.).

Even if you want to change to another routing framework e.g. Spring Integration it's possible since both framework support the POJO bean proxying approach.

I have at least four reasons to use XMPP:
  1. It's very easy to debug. When there are problems you can just use Pidgin or Empathy or Spark or any XMPP/Jabber client to monitor what flows inside the pipes. No need for heavy tools. Even your grandma's mobile phone can do the job.
  2. No tooling needed. Same as above. JMS/ActiveMQ may have more/better tooling but you need the tooling. This can be either strength or weakness depending on how you look at it.
  3. It's just HTTP. I have never tried it but theoretically you can use any HTTP tool to tweak inside XMPP packets since in a way it's "XML over HTTP".
  4. No fancy server needed. You can install Openfire or ejabberd, just like you can install ActiveMQ. But you don't have to, you can use Jabber network or Google Talk or Google Apps' GTalk or even Facebook Chat's XMPP support. All free to use. (If you decide to use these services make sure to comply to their terms of use.)
Note: XMPP server/network's support of XMPP conference room chat functionality may vary.

Adding Camel XMPP and JSON Serialization

To use Camel with XMPP, let's revisit our project's Gradle build script and add camel-xmpp and camel-xstream as dependencies :

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'maven'
sourceCompatibility = '1.6'
repositories {
mavenRepo urls: "file:${System.properties['user.home']}/.m2/repository", name: 'local'
mavenCentral()
}
dependencies {
compile 'org.slf4j:slf4j-api:1.6.1'
runtime 'org.slf4j:jcl-over-slf4j:1.6.1'
runtime 'ch.qos.logback:logback-classic:0.9.26'
compile 'org.apache.camel:camel-core:2.5.0'
runtime 'org.apache.camel:camel-xmpp:2.5.0'
runtime 'org.apache.camel:camel-xstream:2.5.0'
}


While it's possible to use just Camel XMPP without Xstream, in practice it's much easier if we use a serialization library.

Camel provides several providers to serialize (i.e. marshall and unmarshall) our invocation/messages over the network transport (such as XMPP).
Camel calls these providers Data Formats.

I like to use JSON because it's somewhat human-readable (at least it's programmer-readable! :-) ) and relatively compact.

XML is also okay if you prefer it, both are supported by Xstream.

Camel also supports another JSON provider called Jackson but I wasn't able to successfully marshal BeanInvocation (Camel's underlying data class for proxied bean invocation) with it, so it's Xstream for now.

Routing to XMPP

 Let's review the last routing we had :

from("direct:invoice").inOnly().to("seda:invoice.queue");
from("seda:invoice.queue").threads().bean(loggerInvoiceListener);

This is fine. To make our invocation support XMPP all we need to do is change the routes, nothing else:

final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
from("direct:invoice").marshal().json().to(xmppUri);
from(xmppUri).unmarshal().json().bean(loggerInvoiceListener);

This is all that's needed. Apart from adding the dependencies that we've done above, no code needs to be changed!

I'm sure you can guess what the URI means, it tells Camel XMPP connector to create an XMPP endpoint with:
  • username: abispulsabot
  • domain: localhost
  • room: abispulsa.refill
  • password: test
I guess I didn't really have to write that, but just for the sake of assertiveness. ;-)

Note: if you want to use XML just change un/marshal().json() to un/marshal().xstream(). Simple isn't it? No fuss. ;-)

Making It Behave

The above routing is still not cool:
  1. It's single-threaded, i.e. both incoming and outgoing messages are blocking.
  2. It does not handle non-JSON messages.
So let's make it better.

Enabling Multi-threading aka Asynchronous Processing

To enable multi-threading we use the magic threads() DSL :

final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
from("direct:invoice").threads().marshal().json().to(xmppUri);
from(xmppUri).threads().unmarshal().json().bean(loggerInvoiceListener);

That's it. Now each processing will use its own thread (up to a default maximum of 10 threads per thread pool, but you can configure, e.g. "threads(5)")

Logging Non-JSON Messages

So what happens when a non-JSON messages is posted to the XMPP room? Let's just log it:

final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
from("direct:invoice").threads().marshal().json().to(xmppUri);
from(xmppUri).threads().choice()
.when(body().startsWith("{")).unmarshal().json().bean(loggerInvoiceListener)
.otherwise().to("log:xmpp");

We use conditional routing there, using choice() and  when() and otherwise().
And Predicates (constraint expressions) such as body().startsWith().
By the way, Camel supports Predicates with a lot more expression languages, so you won't run out of options.

You'll complain that the above routing still doesn't handle invalid JSON, but let's just be satisfied with that as my example.
I believe you can make better routes much faster than I do. ;-)

And So It Goes...

Again for the sake of completeness, I present you the entire app:

import org.apache.camel.CamelContext;
import org.apache.camel.builder.ProxyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
...
private static Logger logger = LoggerFactory.getLogger(App.class);
private static LoggerInvoiceListener loggerInvoiceListener = new LoggerInvoiceListener();
private static InvoiceListener invoiceListener;
...
CamelContext camelContext = new DefaultCamelContext();
loggerInvoiceListener = new LoggerInvoiceListener();
final String xmppUri = "xmpp://abispulsabot@localhost/?room=abispulsa.refill&password=test";
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:invoice").threads().marshal().json().to(xmppUri);
from(xmppUri).threads().choice()
.when(body().startsWith("{")).unmarshal().json().bean(loggerInvoiceListener)
.otherwise().to("log:xmpp");
}
});
camelContext.start();
invoiceListener = new ProxyBuilder(camelContext).endpoint("direct:invoice").build(InvoiceListener.class);
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");
invoiceListener.invoiceCreated(312, "Crux Market");
logger.info("third invoice sent");
try {
while (true) { // event loop so you can send messages
Thread.sleep(1000);
}
} finally {
camelContext.stop();
}

Here's an example log output:

00:57:03.280 [main] INFO  o.a.c.c.xmpp.XmppGroupChatProducer - Joined room: abispulsa.refill@conference.annafi as: abispulsabot
00:57:03.298 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://invoice]
00:57:03.338 [main] INFO  o.a.c.component.xmpp.XmppConsumer - Joined room: abispulsa.refill@conference.annafi as: abispulsabot
00:57:03.338 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[xmpp://abispulsabot@localhost/?password=******&room=abispulsa.refill]
00:57:03.339 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
00:57:03.339 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) started in 16.448 seconds
00:57:03.510 [main] INFO  id.co.bippo.camelxmppredo.App - first invoice sent
00:57:03.528 [main] INFO  id.co.bippo.camelxmppredo.App - second invoice sent
00:57:03.557 [main] INFO  id.co.bippo.camelxmppredo.App - third invoice sent
00:57:03.570 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
00:57:05.071 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
00:57:05.078 [Camel Thread 4 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
00:57:06.579 [Camel Thread 4 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
00:57:06.584 [Camel Thread 5 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #312 name: Crux Market created
00:57:08.085 [Camel Thread 5 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 312/Crux Market done!
00:57:26.399 [Camel Thread 6 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #7890 name: Bippo Indonesia created
00:57:27.900 [Camel Thread 6 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 7890/Bippo Indonesia done!
00:57:41.776 [Camel Thread 7 - Threads] INFO  xmpp - Exchange[ExchangePattern:InOnly, BodyType:String, Body:You can also filter messages! :)]

Notice that in the above log I posted an invoice #7890 "Bippo Indonesia" using Pidgin XMPP client, and the application can process it successfully.
I also said "You can also filter messages! :)" that simply gets logged.

As a cool bonus I provide a screenshot. ;-)

Extra: Gradle JavaExec

To quickly run the application from the command line, Gradle provides a JavaExec task that can be used like this in build.gradle :

task(exec, dependsOn: classes, type: JavaExec) {
description = 'Run the application'
classpath = runtimeClasspath
main = 'id.co.bippo.camelxmppredo.App'
}

Extra: logback.xml Configuration File

Logback defaults to logging everything to stdout (which is much better than Log4j's default of not logging anything!)

I prefer logging only INFO level, this can be done by putting the following logback.xml in src/main/resources :

<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <layout class="ch.qos.logback.classic.PatternLayout">
      <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</Pattern>
    </layout>
  </appender>
  <root level="info">
    <appender-ref ref="STDOUT" />
  </root>
</configuration>

Utopian dream: I can't help but imagine that had Logback used Gradle-style configuration it would look like this: ;-)

appenders { stdConsole() }
root.level = info

Running the Example

To make it really easy for you (especially to myself, as I tend to forget things!), I've made the examples freely available as camel-xmpp-redo project on GitHub.

You'll need Git and Gradle to checkout then build it (and JDK 1.6).
Gradle will automatically download and cache the required dependencies for you.

git clone git://github.com/ceefour/camel-xmpp-redo.git
cd camel-xmpp-redo
# edit the source and change XMPP credentials
gradle exec
Note: you must change the XMPP credentials used to login.

Easy XMPP with Camel !

Apache Camel is unobtrusive way to add flexibility of routing, messaging, and integration patterns with XMPP or other connectors supported by Camel.
(you can create your own connectors if you want...)

I highly suggest the Camel in Action Book for the best in-depth guide and examples for using Camel to develop your applications more productively.

Implementing Asynchronous Observer Pattern with Bean Proxy [Apache Camel Series]

Apache Camel is a very versatile message routing and enterprise integration framework.

One very useful, practical, and reasonable use case is to implement a Listener/Observer interface that delegates to an actual implementation with a flexible exchange pattern.

So from the point of view of the Observable (event source), the Observer is just a regular "POJO" Java object.

The Listener Interface


Here's the Listener interface :

public interface InvoiceListener {
public void invoiceCreated(int id, String name);
}

What I Want

What I'd like to do is to be able to call a listener object as usual :

InvoiceListener = .....
invoiceListener.invoiceCreated(243, "Sumba Enterprise");


But have the implementation flexible. It can be a local object, a remote web service, or even routed asynchronously using message queue (ActiveMQ) or XMPP.

A nice side effect is that the programming model stays the same. And of course, it's easily testable since your implementation is still POJO.

The Implementation

This is an example implementation of the listener, that simply logs the input.
Note there can be as many implementations as required.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LoggerInvoiceListener implements InvoiceListener {
Logger logger = LoggerFactory.getLogger(getClass());
public void invoiceCreated(int id, String name) {
logger.info("Invoice #{} name: {} created", id, name);
// Simulate asynchronous processing by delaying
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
logger.error("Interrupted", e);
}
logger.info("{}/{} done!", id, name);
}
}

Note that I've added Thread.sleep() there, to simulate processing.

The Usual Way aka Useful for JUnit Tests

So here's the standard stuff:

InvoiceListener = new LoggerInvoiceListener();
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");

While this works, and very useful in unit testing, in our example it is not practical.

The Listener blocks the thread for processing and the program runs for 3000 ms for two invoices.

Not to mention that the original thread cannot do anything else while waiting for the Listener to return.

Here's how we'll fix it using Apache Camel, without changing how we call the listener at all!

Setting Up Apache Camel

I will use Gradle to set up our project and automatically manage Apache Camel dependencies and SLF4j + Logback, which I use for logging.

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'maven'
sourceCompatibility = '1.6'
repositories {
mavenRepo urls: "file:${System.properties['user.home']}/.m2/repository", name: 'local'
mavenCentral()
}
dependencies {
compile 'org.slf4j:slf4j-api:1.6.1'
runtime 'org.slf4j:jcl-over-slf4j:1.6.1'
runtime 'ch.qos.logback:logback-classic:0.9.26'
compile 'org.apache.camel:camel-core:2.5.0'
}

Maven guys can do something similar, but I increasingly prefer Gradle due to its compact syntax and flexibility.

If you're not using dependency management or still use Ant, I strongly recommend Gradle for your project build system.

It's the flexibility of Ant, can optionally use conventions like Maven, with powerful dependency management of Ivy, with Groovy's clean syntax so won't intimidate you with verbose <angle brackets>.

To create the Eclipse IDE Project we'll create the source folder and execute Gradle's "eclipse" task.

mkdir -p src/main/java src/main/resources
gradle eclipse

Configuring CamelContext

Camel routes runs inside a CamelContext. Here's the usual way to create a default CamelContext, configure routes, start the CamelContext and stop it.

CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// TODO: configure routes here
});
camelContext.start();
// TODO: run your app here
camelContext.stop();

Configuring Camel routes can be done using a Java-based DSL (among other choices).

Here's a route DSL for our use case, that simply invokes a specified bean:

from("direct:invoice").bean(loggerInvoiceListener);

Camelizing Our App


With that in mind, let's see how our original application looks like:

private static Logger logger = LoggerFactory.getLogger(App.class);
private static LoggerInvoiceListener loggerInvoiceListener = new LoggerInvoiceListener();
private static InvoiceListener invoiceListener;
public static void main(String[] args) throws Exception {
invoiceListener = loggerInvoiceListener;
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");
}

And this is how to do the same thing, using Apache Camel :

import org.apache.camel.CamelContext;
import org.apache.camel.builder.ProxyBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
...
private static Logger logger = LoggerFactory.getLogger(App.class);
private static LoggerInvoiceListener loggerInvoiceListener = new LoggerInvoiceListener();
private static InvoiceListener invoiceListener;
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:invoice").bean(loggerInvoiceListener);
}
});
camelContext.start();
invoiceListener = new ProxyBuilder(camelContext).endpoint("direct:invoice")
.build(InvoiceListener.class);
invoiceListener.invoiceCreated(243, "Sumba Enterprise");
logger.info("first invoice sent");
invoiceListener.invoiceCreated(938, "Mina Co.");
logger.info("second invoice sent");
camelContext.stop();
}

You can see the Listener API usage has not changed at all.

Apart from the Camel runtime, the main difference lies in how we get the InvoiceListener object, which is now a proxy provided by Camel:

invoiceListener = new ProxyBuilder(camelContext).endpoint("direct:invoice")
  .build(InvoiceListener.class);

Here's the output of the Camel-ized app :

23:59:09.629 [main] INFO  o.a.c.i.c.AnnotationTypeConverterLoader - Found 3 packages with 13 @Converter classes to load
23:59:09.666 [main] INFO  o.a.c.i.c.DefaultTypeConverter - Loaded 146 type converters in 0.405 seconds
23:59:09.774 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://invoice]
23:59:09.775 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 1 routes, of which 1 is started.
23:59:09.775 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) started in 0.546 seconds
23:59:09.788 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
23:59:11.289 [main] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
23:59:11.295 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
23:59:11.296 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
23:59:12.796 [main] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
23:59:12.797 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
23:59:12.797 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext:camel-1) is shutting down
23:59:12.798 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
23:59:12.806 [Camel Thread 0 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 suspension deferred.
23:59:12.807 [Camel Thread 0 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete.
23:59:12.807 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 1 routes completed in 0 seconds
23:59:12.809 [main] INFO  o.a.c.impl.DefaultInflightRepository - Shutting down with no inflight exchanges.
23:59:12.810 [main] INFO  o.a.camel.impl.DefaultCamelContext - Uptime: 3.581 seconds
23:59:12.810 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) is shutdown in 0.013 seconds

The app runs > 3.5 seconds, while about 400 ms is spent on Apache Camel startup. This is the minimal overhead of Camel.

I Want Them All at Once! Making It Asynchronous

A useful use case is to make the listener invocations asynchronous, i.e. performed with multiple threads. But not changing the caller code at all.
Let's see how we can do this with Camel.

All we need to do is change:

from("direct:invoice").bean(loggerInvoiceListener);

to:

from("direct:invoice").inOnly().to("seda:invoice.queue");
from("seda:invoice.queue").threads().bean(loggerInvoiceListener);

This route tells Camel to transform the original invocation to InOnly (meaning that no response/reply is needed), and send it to a SEDA Queue endpoint.

The messages in SEDA Queue will be split in multiple threads (default thread pool is 10 threads, but you can specify yourself) and each thread will invoke the destination implementation which is loggerInvoiceListener bean.

Here's how it looks right now :

00:11:44.952 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[direct://invoice]
00:11:44.954 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://invoice.queue]
00:11:44.954 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
00:11:44.954 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) started in 0.529 seconds
00:11:44.969 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
00:11:44.970 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
00:11:44.970 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext:camel-1) is shutting down
00:11:44.971 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
00:11:44.971 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
00:11:44.971 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
00:11:44.974 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 suspension deferred.
00:11:44.974 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 suspension deferred.
00:11:44.974 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 300 seconds.
00:11:45.975 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 299 seconds.
00:11:46.471 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
00:11:46.471 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
00:11:46.976 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete.
00:11:46.976 [Camel Thread 3 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete.
00:11:46.976 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 2 seconds
00:11:46.977 [main] INFO  o.a.c.impl.DefaultInflightRepository - Shutting down with no inflight exchanges.
00:11:46.978 [main] INFO  o.a.camel.impl.DefaultCamelContext - Uptime: 2.553 seconds
00:11:46.979 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.5.0 (CamelContext: camel-1) is shutdown in 2.008 seconds

You can see from the log that Camel has executed our application in multiple parallel threads, as an effect, our application runs in about 2.5 seconds
instead of ~3.5 seconds like the previous.

You can also see that the order of execution is very different now.

Synchronous invocations:

23:59:09.788 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
23:59:11.289 [main] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!
23:59:11.295 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
23:59:11.296 [main] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
23:59:12.796 [main] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
23:59:12.797 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
Asynchronous invocations, with Camel:
00:11:44.969 [main] INFO  id.co.bippo.camelasyncredo.App - first invoice sent
00:11:44.970 [main] INFO  id.co.bippo.camelasyncredo.App - second invoice sent
...
00:11:44.971 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #938 name: Mina Co. created
00:11:44.971 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - Invoice #243 name: Sumba Enterprise created
...
00:11:46.471 [Camel Thread 2 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 938/Mina Co. done!
00:11:46.471 [Camel Thread 1 - Threads] INFO  i.c.b.c.LoggerInvoiceListener - 243/Sumba Enterprise done!

Instead of objects waiting for the previous invocation to complete, they can run independently.

All without changing the caller code nor the interface API nor the implementation code.

What you simply do is change the Camel routes.

Running the Example

To make it really easy for you (especially to myself, as I tend to forget things!), I've made the examples freely available as camel-async-redo project on GitHub.

You'll need Git and Gradle to checkout then build it (and JDK 1.6).
Gradle will automatically download and cache the required dependencies for you.

git clone git://github.com/ceefour/camel-async-redo.git
cd camel-async-redo
gradle exec

Conclusion

Apache Camel is unobtrusive way to add flexibility of routing, messaging, and integration patterns into your Java application.
(although, Camel can be useful to seam/weave/integrate any combination of applications, even remote services, so not just Java, but that's another topic...)

I highly suggest the Camel in Action book for the best in-depth guide and examples for using Camel to develop your applications more productively.

Monday, January 4, 2010

Event Notification Framework with Spring Integration 2.0

Spring Integration is SpringSource's library for implementing Enterprise Integration Patterns in a Java application. I'm going to demonstrate how to use Spring Integration as an event notification framework.

I'll use the example in my previous article, Advanced Event Notification Framework with Apache Camel. In short, we're going to make a Sensor publish notification events to two Displays.

Fortunately, there is very minimal code change required to implement it using Spring Integration. My code was much more portable than I thought. ;-)

The complete application source can be downloaded/explored from: http://github.com/ceefour/eventfx.si

Tweaking Listener Interfaces


I had made a minor portability mistake before, annotating the listener interfaces with Camel-specific annotations. We can leave them out, as we can configure Spring Integration to work with POJO classes, without any annotations. (FYI, Apache Camel does too)

SyncListener.java becomes:

package com.soluvas.samples.eventfx.si;

import java.util.EventListener;

public interface SyncListener<E, R> extends EventListener {

 R update(E event);
}

AsyncListener.java becomes:

package com.soluvas.samples.eventfx.si;

import java.util.EventListener;

public interface AsyncListener<E> extends EventListener {

 void notify(E event);
}

Spring Context Configuration


The most major change is the Spring Context Configuration file.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context"
    xmlns:si="http://www.springframework.org/schema/integration"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
  http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">

 <context:component-scan base-package="com.soluvas.samples.eventfx.si" />

 <bean class="com.soluvas.samples.eventfx.si.Sensor">
  <property name="sensorSyncListener">
   <si:gateway id="sensorSyncListener" service-interface="com.soluvas.samples.eventfx.si.SyncListener"
    default-request-channel="Sensor_SensorEvent_sync" />
  </property>
  <property name="sensorAsyncListener">
   <si:gateway id="sensorAsyncListener" service-interface="com.soluvas.samples.eventfx.si.AsyncListener"
    default-request-channel="Sensor_SensorEvent_async" />
  </property>
 </bean>
 <bean id="display1" class="com.soluvas.samples.eventfx.si.Display">
  <property name="name" value="Sony(sync)" />
 </bean>
 <bean id="display2" class="com.soluvas.samples.eventfx.si.Display">
  <property name="name" value="Samsung(async)" />
 </bean>
 
 <si:channel id="Sensor_SensorEvent_sync" />
 <si:publish-subscribe-channel id="Sensor_SensorEvent_async" />

 <si:service-activator input-channel="Sensor_SensorEvent_sync" ref="display1" method="update" />
 <si:chain input-channel="Sensor_SensorEvent_async">
  <si:delayer default-delay="1200" />
  <si:service-activator ref="display2" method="notify" />
 </si:chain>
 
</beans>

The differences between this Spring Integration version with the Apache Camel version are:
  1. CamelProxyFactoryBean's are replaced with si:gateway beans, which has a more concise syntax.
    Camel 2.2.0 will support similarly concise camel:proxy element that can be used outside camelContext element.
  2. Endpoint URIs are replaced with explicit declaration of Spring Integration Channel beans. Here I use direct channel (si:channel) for sync events and publish-subscribe channel (si:publish-subscribe-channel) for async events.
    I think the the explicit publish-subscribe channel in Spring Integration is very useful. In Apache Camel 2.1 and earlier, the only way to do this is to use multicast. In Apache Camel 2.2, the SEDA component will have a multipleConsumers option that provides a lightweight publish-subscribe mechanism (dynamic multicasting).
  3. I specify the "routing" to Display service-activators using the Spring XML DSL. Apache Camel's Spring DSL feels much more powerful.

The Main Application


The main application, App.java, can't get any simpler:

package com.soluvas.samples.eventfx.si;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App {
 public static void main(String[] args) {
  new ClassPathXmlApplicationContext("META-INF/spring/*.xml");
 }
}

I'm using Spring Integration 2.0 with Spring Framework 3.0 for this project. For those who are curious, here's the project's Maven pom.xml :

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>com.soluvas.samples</groupId>
 <artifactId>eventfx.si</artifactId>
 <packaging>jar</packaging>
 <version>0.0.1-SNAPSHOT</version>
 <name>eventfx.si</name>
 <url>http://maven.apache.org</url>
 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.7</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-core</artifactId>
   <version>2.0.0.M2</version>
  </dependency>
 </dependencies>
 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
     <source>1.6</source>
     <target>1.6</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
 <repositories>
  <repository>
   <id>spring-milestone</id>
   <name>Spring Portfolio Milestone Repository</name>
   <url>http://s3.amazonaws.com/maven.springframework.org/milestone</url>
  </repository>
 </repositories>
</project>

Event Notification Framework with Spring Integration 2.0 in Action


The app now runs exactly same as the Camel Event Notification example. Here's the application output log:

Jan 5, 2010 3:00:19 AM org.springframework.context.support.AbstractApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@12a54f9: startup date [Tue Jan 05 03:00:19 WIT 2010]; root of context hierarchy
Jan 5, 2010 3:00:19 AM org.springframework.beans.factory.xml.XmlBeanDefinitionReader loadBeanDefinitions
INFO: Loading XML bean definitions from file [/home/ceefour/Sandbox/eventfx.si/target/classes/META-INF/spring/eventfx-si.xml]
Jan 5, 2010 3:00:20 AM org.springframework.integration.config.xml.DefaultConfiguringBeanFactoryPostProcessor registerErrorChannelIfNecessary
INFO: No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
Jan 5, 2010 3:00:20 AM org.springframework.integration.config.xml.DefaultConfiguringBeanFactoryPostProcessor registerTaskSchedulerIfNecessary
INFO: No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
Jan 5, 2010 3:00:20 AM org.springframework.beans.factory.support.DefaultListableBeanFactory preInstantiateSingletons
INFO: Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@2f0df1: defining beans [sensorSimulator,org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.integration.internalDefaultConfiguringBeanFactoryPostProcessor,com.soluvas.samples.eventfx.si.Sensor#0,display1,display2,Sensor_SensorEvent_sync,Sensor_SensorEvent_async,org.springframework.integration.handler.ServiceActivatingHandler#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#0,org.springframework.integration.handler.DelayHandler#e06940,org.springframework.integration.handler.ServiceActivatingHandler#1aae94f,org.springframework.integration.handler.MessageHandlerChain#0,org.springframework.integration.config.ConsumerEndpointFactoryBean#1,nullChannel,errorChannel,org.springframework.integration.handler.LoggingHandler#0,org.springframework.integration.endpoint.EventDrivenConsumer#0,org.springframework.integration.channel.MessagePublishingErrorHandler#0,taskScheduler]; root of factory hierarchy
Jan 5, 2010 3:00:21 AM org.springframework.scheduling.concurrent.ExecutorConfigurationSupport initialize
INFO: Initializing ExecutorService  'taskScheduler'
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.gateway.SimpleMessagingGateway@1fe571f
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started si:gateway#1c5fde0
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.gateway.SimpleMessagingGateway@cf710e
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started si:gateway#19e8329
Jan 5, 2010 3:00:21 AM com.soluvas.samples.eventfx.si.SensorSimulator initialize
INFO: Sensor simulator initialized.
Jan 5, 2010 3:00:21 AM com.soluvas.samples.eventfx.si.Display initialize
INFO: Display Sony(sync) created.
Jan 5, 2010 3:00:21 AM com.soluvas.samples.eventfx.si.Display initialize
INFO: Display Samsung(async) created.
Jan 5, 2010 3:00:21 AM org.springframework.scheduling.concurrent.ExecutorConfigurationSupport initialize
INFO: Initializing ExecutorService 
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
Jan 5, 2010 3:00:21 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.endpoint.EventDrivenConsumer#0
Jan 5, 2010 3:00:23 AM com.soluvas.samples.eventfx.si.Sensor updateText
INFO: updateText: Something happens at 3:00:23 AM
Jan 5, 2010 3:00:23 AM com.soluvas.samples.eventfx.si.Display update
INFO: [Sony(sync)] is updated: 'Something happens at 3:00:23 AM'
Jan 5, 2010 3:00:23 AM com.soluvas.samples.eventfx.si.Sensor fireSensor
INFO: Response: Sony(sync) received Something happens at 3:00:23 AM
Jan 5, 2010 3:00:23 AM org.springframework.integration.endpoint.AbstractEndpoint start
INFO: started org.springframework.integration.endpoint.EventDrivenConsumer@1be2893
Jan 5, 2010 3:00:24 AM com.soluvas.samples.eventfx.si.Display notify
INFO: [Samsung(async)] is notified: 'Something happens at 3:00:23 AM'
Jan 5, 2010 3:00:25 AM com.soluvas.samples.eventfx.si.Sensor updateText
INFO: updateText: Something happens at 3:00:25 AM
Jan 5, 2010 3:00:25 AM com.soluvas.samples.eventfx.si.Display update
INFO: [Sony(sync)] is updated: 'Something happens at 3:00:25 AM'
Jan 5, 2010 3:00:25 AM com.soluvas.samples.eventfx.si.Sensor fireSensor
INFO: Response: Sony(sync) received Something happens at 3:00:25 AM

Which One Is More Powerful?


Technically speaking, I think Apache Camel is more powerful than Spring Integration. And this is not just a "feel", see the Spring XML above to see it for real.

For simpler purposes, both Apache Camel and Spring Integration do their job very well.

For more complex purposes, currently they have their own strengths and weaknesses.

Apache Camel allows you to refer to endpoints either by name ("ref") or by URI (though an endpoints must specify a URI), and creating an endpoint automatically just by referring to its URI. Spring Integration can only refer to a channel by name, and useful (i.e. non-direct) channels need to be created explicitly.

Apache Camel routes are extremely flexible, and can be specified in a variety of DSLs. With Spring Integration it's either basic routing or custom processing. Spring Integration has XML DSL and annotations, that Apache Camel also has.

Spring Integration's concept of publish-subscribe channel is more convenient than Apache Camel's.

Learning More on Enterprise Integration


Spring Integration is all about decoupling messaging between components, which is part of enterprise integration. If you want to learn more about enterprise integration, I heartily recommend the book Enterprise Integration Patterns.

Advanced Event Notification Framework with Apache Camel


Let's build an Event Notification Framework using Apache Camel... Second Edition.

I've demonstrated how to do event notification with Apache Camel before. But the classes were hard-coupled to Camel URI endpoints.

This time, I have these goals in mind:
  • Decoupled. The event source and listener(s) must be as decoupled as possible, but in a good way that it's not too complex.
  • As POJO as possible. Event sources, event classes, and event listeners need not import the event framework classes/interfaces. Annotations may be used.
See also: Event Notification with Spring Integration

    Wednesday, December 30, 2009

    How Java Developers Solve the Event Notification Problem

    Event Notification ... Event Handling ... Publish-Subscribe Messaging ... Message Passing ... or whatever you call it.

    I'm not the first person to care about this problem. (but it's a problem that I'd rather not care about, especially after I met Qt's Signals and Slots... Java beaten by C++? Come on!)

    Note: I'm not talking about enterprise or distributed messaging, just typical event handling mechanism in a single application.

    Here are some examples of people contributing solutions to Event Notification in one way or another:
    I'm not so impressed. This is such a fundamental infrastructure problem that seems virtually untouched by the Java developers... for almost 15 years!!

    Event Notification with Apache Camel

    I ever tried using Apache Camel to solve the event notification problem.

    Given a Sensor (event source) and a Display (listener), here's how I would implement it:

    public class Sensor {
    
        @EndpointInject(uri="seda:sensor.events")
        private ProducerTemplate sensorProducer;
        
        private void fireSensor(SensorEvent sensorEvent) {
            sensorProducer.sendBody(sensorEvent);
        }
    
    }
    
    public class Display {
    
        @Consume(uri="seda:sensor.events")
        public void sensorUpdated(SensorEvent sensorEvent) {
            // ... code to update ...  
        }
    
    }

    Advantages


    As you can see, the code is pretty concise and elegant. Even Listener interfaces are not needed here. Simply agree on an event object, and the endpoint (channel) URI, and the event is wired up.

    POJO is somewhat preserved, if not for ProducerTemplate.

    It's possible to receive response from listeners (asynchronously), but I think it should be thought as a feature of the routing engine, not the event notification framework.

    Issues


    The above isn't a true publish-subscribe though, since Camel's seda component won't support multiple consumers (multicasting) until Camel 2.2.0 is released.

    It also uses hard coded endpoint URIs, which is quick and convenient for singleton event sources but falls apart when there are several event sources of the same type.

    A routing engine introduces one more concept besides event sources, listeners, and event objects: endpoints (channels a.k.a. topics). I hope it's possible to remove this, but all publish-subscribe event notification frameworks use it, including Dojo AJAX Toolkit's dojo.publish.

    The event source object can be put inside the event object so the listeners can filter it, which addresses one side of the problem.

    Making endpoint URIs dynamic is another alternative, i.e. referring to its own Spring bean name via BeanNameAware.

    Yet another alternative is having a router-consumer that routes the messages (events) to actual listeners. This way, listeners attach to the router similar to old JavaBean event notification way. But I think this is getting too complex.

    Dynamic Endpoints

    Here's some code to illustrate how to implement event notification with Apache Camel dynamic endpoints.

    public class Sensor { 
    
        // dynamically created or injected
        private Endpoint sensorEndpoint;
        @EndpointInject
        private ProducerTemplate producer;
        
        private void fireSensor(SensorEvent sensorEvent) {
            producer.sendBody(sensorEndpoint, sensorEvent);
        }
    
    }
    
    public class Display {
    
        @Autowired
        private Endpoint sensorEndpoint;
        private Consumer sensorConsumer;
        
        @PostConstruct
        public void initialize() {
            // BeanProcessor should automatically
            // match the method based on argument type
            sensorConsumer = sensorEndpoint.createConsumer(
                new BeanProcessor(this, sensorEndpoint.getCamelContext()));
            sensorConsumer.start();
        }
        
        @PreDestroy
        public void destroy() {
            sensorConsumer.stop();
        }
    
        public void sensorUpdated(SensorEvent sensorEvent) {
            // ... code to update ...  
        }
    
    }

    It's far from elegant. Especially the part where the Event Driven Consumer has to be created, configured, started, and stopped manually.

    It's also somewhat decoupled, since the consumer needs to only know the endpoint object or the endpoint URI. And the producer doesn't have to iterate all consumers and call their callbacks one by one.

    Update: I've written a more advanced proof-of-concept event notification with Apache Camel here.

    Generics for Event Listener Pattern

    To workaround the somethingListener -> somethingEvent mapping.

    I'm not the first person to care about Event Notification. (but it's a problem that I'd rather not care about, especially after I met Qt's Signals and Slots... Java beaten by C++? Come on!)
      Maybe we can use Java 5 generics to save some code...
      // Common interface for all listeners. 
      public interface Listener<E> {
        public void notify(E event);
      }
      
      public class PedalEvent {
        // any custom properties wherever necessary
      }
      
      public class PedalListener implements Listener<PedalEvent> {
        public void notify(PedalEvent event) {
           // ... implementation ... 
        } 
      } 
      
      public class Car {
        public void addPedalListener(Listener<PedalEvent> pedalListener); 
        public void removePedalListener(Listener<PedalEvent> pedalListener); 
      } 
      
      
      What do you think?

      Event Driven Publish-Subscribe Implementation Ideas

      Some ideas for impementing publish-subscribe event notification: