JMS queues with JEE6 and JBoss 7

This post will show how to configure a JMS queue on JBoss 7.1.1-Final and how to make use of it through a JEE 6 application. Besides, some aspects regarding transactional session and JMS queue’s number of consumers will also be presented.

JMS

This post does not aim to depict JMS in details, but it will give an overview about it.
JMS is part of the Java EE specification and can be considered a MOM (Message Oriented Middleware) which allows clients to exchange asynchronous messages among them.
There are basically two models to exchange messages:

  • Queue: It’s a point-to-point on which one side (producer) pushes a message and places it in a JMS queue and the other side (consumer) pulls the message from the queue. This post will be using this model throughout its example.
  • Topic: Adopts the publish-subscribe model on which one side (publisher) pushes as message on a JSM topic and, for this topic, there can be 0 or more subscribers that receive all the published messages.

More details can be found on the JEE 6 Tutorial.

Configuring a JMS queue on JBoss

JBoss AS uses the HornetQ library (http://www.jboss.org/hornetq) as its JMS provider. HornetQ is an open-source messaging system, fully compliant with JMS specification, with characteristics of robustness, performance and scalability. In order to configure a JMS queue, it’s necessary to edit the JBOSS_HOME/standalone/configuration/standalone.xml file which, as default, does not come with the JMS module configured. (If you prefer to use the configuration file standalone-full.xml, that’s OK, as it already comes with the JMS configured.)
Open the JBOSS_HOME/standalone/configuration/standalone.xml file and edit the following sections:

Add the messaging module inside <extensions>

<extensions>
	...
	<extension module="org.jboss.as.messaging"/>
</extensions>

Add the messaging subsystem configuration right bellow <subsystem xmlns=”urn:jboss:domain:weld:1.0″/>

        <subsystem xmlns="urn:jboss:domain:messaging:1.1">
            <hornetq-server>
                <persistence-enabled>true</persistence-enabled>
                <journal-file-size>102400</journal-file-size>
                <journal-min-files>2</journal-min-files>

                <connectors>
                    <netty-connector name="netty" socket-binding="messaging"/>
                    <netty-connector name="netty-throughput" socket-binding="messaging-throughput">
                        <param key="batch-delay" value="50"/>
                    </netty-connector>
                    <in-vm-connector name="in-vm" server-id="0"/>
                </connectors>

                <acceptors>
                    <netty-acceptor name="netty" socket-binding="messaging"/>
                    <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </netty-acceptor>
                    <in-vm-acceptor name="in-vm" server-id="0"/>
                </acceptors>

                <security-settings>
                    <security-setting match="#">
                        <permission type="send" roles="guest"/>
                        <permission type="consume" roles="guest"/>
                        <permission type="createNonDurableQueue" roles="guest"/>
                        <permission type="deleteNonDurableQueue" roles="guest"/>
                    </security-setting>
                </security-settings>

                <address-settings>
                    <address-setting match="#">
                        <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                        <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                        <redelivery-delay>0</redelivery-delay>
                        <max-size-bytes>10485760</max-size-bytes>
                        <address-full-policy>BLOCK</address-full-policy>
                        <message-counter-history-day-limit>10</message-counter-history-day-limit>
                    </address-setting>
                </address-settings>

                <jms-connection-factories>
                    <connection-factory name="InVmConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/ConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    <connection-factory name="RemoteConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="netty"/>
                        </connectors>
                        <entries>
                            <entry name="RemoteConnectionFactory"/>
                            <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    <pooled-connection-factory name="hornetq-ra">
                        <transaction mode="xa"/>
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/JmsXA"/>
                        </entries>
                    </pooled-connection-factory>
                </jms-connection-factories>

                <jms-destinations>
                    <jms-queue name="testQueue">
                        <entry name="queue/test"/>
                        <entry name="java:jboss/exported/jms/queue/test"/>
                    </jms-queue>
                    <jms-topic name="testTopic">
                        <entry name="topic/test"/>
                        <entry name="java:jboss/exported/jms/topic/test"/>
                    </jms-topic>
                </jms-destinations>
            </hornetq-server>
        </subsystem>

This is the default configuration and the code example will be using the testQueue queue, which is already pre-configured. If you wish to add a new queue, you just need to add a new <jms-queue>.
In order to access a queue, we need a ConnectionFactory. Our configuration has 2 of them (one regular and on transactional), which can be be accessed by java:/ConnectionFactory and java:/JmsXA names respectively and a queue can be accessed by either java:/queue/test or java:jboss/exported/jms/queue/test names.

More details regarding configuration can be found in JBoss and HornetQ documentation.

It’s also needed to configure the socket-binding referenced by <connectors> and <acceptors>. Scroll down until the section <socket-binding-group> and add the following entries:

...
        <socket-binding name="messaging" port="5445"/>
        <socket-binding name="messaging-throughput" port="5455"/>
...

To finalize JBoss configuration, it’s needed to define the instances pool to be used by the message consumer. Go until the <subsystem xmlns=”urn:jboss:domain:ejb3:1.2″> section and add the following content between </session-bean> and <pools>:

            <mdb>
                <resource-adapter-ref resource-adapter-name="hornetq-ra"/>
                <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
            </mdb>

Now, the JBoss is fully configured and we can start to code the example.

Demo application

Our application will be fairly simple and will be packaged as a .war file. It will comprise:

  • A simple POJO (User) with a String attribute called “name”
  • A Servlet responsible for creating a message (User) and sending a it to an EJB that will send it to the JMS queue
  • A MDB responsible for receiving messages from the testQueue queue

We will be using maven as build tool, so you can choose your preferable IDE.

The pom.xml file should be configured as showed bellow:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.lucianomolinari</groupId>
	<artifactId>jms_example</artifactId>
	<version>1.0.0</version>
	<packaging>war</packaging>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.3.2</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-war-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<failOnMissingWebXml>false</failOnMissingWebXml>
				</configuration>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>javax</groupId>
			<artifactId>javaee-api</artifactId>
			<version>6.0</version>
			<scope>provided</scope>
		</dependency>
	</dependencies>

</project>

POJO that will be sent as the JMS message:

package com.lucianomolinari.jms_example.entity;

import java.io.Serializable;

public final class User implements Serializable {
	private static final long serialVersionUID = 8734596722276424601L;

	private final String name;

	public User(String name) {
		this.name = name;
	}

	public String getName() {
		return name;
	}

}

EJB responsible for sending the JMS message:

package com.lucianomolinari.jms_example.jms;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import com.lucianomolinari.jms_example.entity.User;

@Stateless
public class MessageCreator {
	@Resource(mappedName = "java:/ConnectionFactory")
	private ConnectionFactory connectionFactory;

	@Resource(mappedName = "java:/queue/test")
	private Destination destination;

	private Connection connection;
	private Session session;
	private MessageProducer messageProducer;

	@PostConstruct
	public void init() {
		try {
			connection = connectionFactory.createConnection();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			messageProducer = session.createProducer(destination);
		} catch (JMSException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

	@PreDestroy
	public void destroy() {
		if (connection != null) {
			try {
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

	public void sendMessage(User user) {
		ObjectMessage message;
		try {
			message = session.createObjectMessage(user);
			messageProducer.send(message);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

MDB that will receive the message from the queue:

package com.lucianomolinari.jms_example.jms;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import com.lucianomolinari.jms_example.entity.User;

@MessageDriven(activationConfig = {
		@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
		@ActivationConfigProperty(propertyName = "destination", propertyValue = "java:/queue/test") })
public class MessageConsumer implements MessageListener {

	@Override
	public void onMessage(Message message) {
		ObjectMessage objMsg = (ObjectMessage) message;
		try {
			System.out.println(">>>>> Receiving message with user " + ((User) objMsg.getObject()).getName());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

Finally the Servlet responsible for invoking the EJB producer:

package com.lucianomolinari.jms_example.web.servlet;

import java.io.IOException;

import javax.ejb.EJB;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.lucianomolinari.jms_example.entity.User;
import com.lucianomolinari.jms_example.jms.MessageCreator;

@WebServlet("/test")
public class ServletCreateMessage extends HttpServlet {
	private static final long serialVersionUID = -973490316445738120L;

	@EJB
	private MessageCreator messageCreator;

	@Override
	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		System.out.println(">>>>Creating test message...");
		messageCreator.sendMessage(new User("Test user"));
	}

}

In order to send a JMS message, just access the URL http://localhost:8080/jms_example-1.0.0/test. It is possible to check the message being produced and consumed through JBoss log.

Important considerations

Transactional session

When a JMS session is created, the first parameter (boolean) indicates whether that session should be transactional or not. In the example above, a non-transactional session was created:

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

However, that’s not always the desired behaviour, as in many cases the JMS message sending is part of a bigger transaction, that involves other tasks, such as updating a database. In order to make it a transactional session, it’s just needed to inform true as the parameter value:

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

Unfortunately, if you make only this change and run the application again, you’ll realize that the message never gets sent/consumed to/from the queue. That happens because the ConnectionFactory used (java:/ConnectionFactory) has not been configured to be transactional. In order to make the message to be sent/consumed property with a transactional session, there are 2 approaches:

Commit explicitly

One approach is to manually commit the session after the message sending:

	public void sendMessage(User user) {
		ObjectMessage message;
		try {
			message = session.createObjectMessage(user);
			messageProducer.send(message);
			session.commit();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

However, this approach is not the recommended one, by the same reason it’s not recommended to use a non-transactional session: the message sending may be part of a broader transaction.

Make use of a ConnectionFactory with transactional support

There’s another ConnectionFactory created on JBoss (java:/JmsXA) that is configured as transactional (<transaction mode=”xa”/>). So, we can get the initial code and perform the following changes:

Update ConnectionFactory’s mappedName to para java:/JmsXA

	@Resource(mappedName = "java:/JmsXA")
	private ConnectionFactory connectionFactory;

And make the session transactional

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

With that, everything starts to work properly again and better: now our sending message process can be part of a bigger transaction.

Configuring the number of consumers for a queue

To finish this post, a simple but powerful configuration will be described. This configuration can be really useful for helping your application to achieve better performance/scalability regarding consuming messages from a JMS queue.

Imagine the following scenario:

  • The processing required to process a message is fairly heavy and it takes 5 seconds to be completed
  • The number of messages per second placed on the queue is “reasonable” (10 messages/second)
  • There are only 5 MDB instances consuming the messages

Taking into account a 10 seconds window time, your queue would have received 100 messages and your application would have consumed only 10 of these messages, once each MDB instance would have consumed 2 messages (10 sec / 5 sec) * 5 MDB instances. In order to configure the number of MDB instances, open the JBoss configuration file (standalone.xml) and scroll down until the section regarding the <subsystem xmlns=”urn:jboss:domain:ejb3:1.2″> subsystem.

..
            <mdb>
                <resource-adapter-ref resource-adapter-name="hornetq-ra"/>
                <bean-instance-pool-ref pool-name="mdb-strict-max-pool"/>
            </mdb>
            <pools>
                <bean-instance-pools>
                    <strict-max-pool name="slsb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
                    <strict-max-pool name="mdb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>
                </bean-instance-pools>
            </pools>
..

Not that the line <bean-instance-pool-ref pool-name=”mdb-strict-max-pool”/> links with the mdb-strict-max-pool pool, which in turn has its own configuration:

<strict-max-pool name="mdb-strict-max-pool" max-pool-size="20" instance-acquisition-timeout="5" instance-acquisition-timeout-unit="MINUTES"/>

As we can see, the default value is 20 instances for each MDB. When necessary, you just need to adjust the pool size.

Conclusion

Making use of JMS with JEE6 is quite a simple process, which requires a few annotations some lines of code. However, it’s a very robust and useful mechanism, extensively used in JEE applications.

The code can be found on GitHub and feel free to ask if you have any question.

27 thoughts on “JMS queues with JEE6 and JBoss 7

  1. Boa noite Luciano. Eu refiz o exemplo aqui em casa, também utilizando o JBOSS AS 7, mas para que a injeção funcionasse na servlet, precise adicionar a anotação @LocalBean ao Bean ProdutorMensagem:

    @Stateless
    @LocalBean
    public class ProdutorMensagem{
    //resto da implementação
    }

    No mais, funcionou que foi uma maravilha. Eu particularmente não gostei da abordagem do AS 7 em relação a libs, onde agora é necessário configurar um módulo e declarar quais bibliotecas desejamos utilizar. É verdade que isso torna as coisas mais organizadas e o “Jar Hell” acaba, mas ficou muito trabalhoso fazer isso.

    Eu gostei bastante do novo modo administrativo via browser. Pretendo utilizá-lo a partir de agora.

    Ótimo artigo, parabéns.

    Like

    1. Bom dia Odair,
      Quando testei em casa, não precisei anotar com @LocalBean, de qualquer forma, o importante é que funcionou. Depois vou até testar novamente para validar.
      Realmente, essa mudança na forma de utilização de lib trouxe vantagens e desvantagens, principalmente para quem estava acostumado com o modelo antigo, que é meu caso. Mas no geral, o JBoss 7 ficou bem legal, mais rápido, esse console administrativo via browser tá bem melhor, agora tem console via linha de comando..enfim, bastante coisa nova.
      Obrigado pelo feedback, abraços.

      Like

  2. Luciano quando eu tento startar o projeto ele da.
    Exception in thread “main” java.lang.NullPointerException

    bem na hora de abrir a conexão.
    Connection connection = connectionFactory.createConnection();

    Alguma ideia?
    Excelente artigo Obrigado!

    Like

  3. Boa tarde,

    luciano parabéns pelo post… me quebrou um galho aqui rapidamente solucionei um problema.

    Abraço

    Like

  4. Amigo, primeiro parabéns pelo post.

    Eu estou tendo nullpointer nessa linha: produtorDeMensagem.enviarMensagem(new Usuario(“Usuario Teste”)); na hora de tentar rodar o servlet. Eu vi que faz sentido dar nullpointer uma vez que no seu código vc não inicializa a variável. Gostaria de saber o que estaria ocorrendo.

    Abraço

    Like

      1. Opa Luciano, obrigado pela resposta. Bom eu acreditava que sim. Eu criei o projeto como Maven e mandei mandei rodar “Run on server” num servidor JBoss 7.1 Runtime Server, achei que isso seria o suficiente. O que eu estaria fazendo de errado?

        Abraço

        Like

      2. Oi Xilon,
        No exemplo subi o JBoss sem o Eclipse. Tente usar o código disponível para download e subir o JBoss sem usar o eclipse (após fazer as configurações descritas no artigo).
        Com isso deve funcionar.

        Abraço

        Like

  5. Eu queria adicionar uma coisa: para conseguir fazer subir o server JBoss eu precisei adicionar a tag mdb no standalone:

    Isso é alguma configuração nova?

    Like

  6. Olá, muito obrigado pelo artigo.
    Essa configuração funciona para chamadas remota? Ex. Caso um servidor em outra instância queira consumir JMS.

    Like

  7. Eu estou tendo nullpointer nessa linha: produtorDeMensagem.enviarMensagem(new Usuario(“Usuario Teste”)); na hora de tentar rodar o servlet. Eu vi que faz sentido dar nullpointer uma vez que no seu código vc não inicializa a variável. Gostaria de saber o que estaria ocorrendo.

    Nas respostas tem escrito subir o JBoss sem usar o eclipse . Como se faz isso???

    Ja tentei pelo arquivo run.bat e não funciona.

    Att

    Like

    1. Olá Bruno,
      A variável é inicializada automaticamente pelo conteiner, conforme especificação do Java EE.

      Sobre a inicialização do JBoss sem o Eclipse, é só você chamar o script standalone.sh (ou .bat), que está na mesma pasta do run.bat.

      Att.

      Like

    2. Isso é por causa do CDI, provavelmente voce nao criou o arquivo beans.xml dentro da pasta WEB-INF, é só criar um arquivo beans.xml vazio e colocar nesta pasta.

      Like

  8. Olá Luciano, tenho um cenário em que rodo várias instâncias do jBoss, e cada uma destas instâncias roda mais de uma instância da minha aplicação, é possível utilizar JMS neste cenário ?

    Like

  9. Olá Luciano, tenho um método anotado com @Transaction que realiza operações na base de dados e em seguida envia mensagens JMS para uma fila. Gostaria que o método responsável pelo envio de mensagens usasse a mesma transação já aberta. Podes dar uma força?

    Like

    1. Ola Iury,
      Nesse caso você precisa usar uma session transacional e com o connection factory XA. Além disso, seu data source com o banco de dados também precisa ser XA.
      Abraços,
      Luciano

      Like

Leave a comment