JMS Transaction

What is JMS Transaction?

Table of Contents

JMS transaction organize a message or message group into an atomic processing unit. If it fails to deliver a message, it may result in redelivery of that message or message group.

An application has various transaction options available, including whether it wants to participate in transactions or not. If the application does not use transactions, it can use one of these acknowledgement modes: auto, duplicates okay, and client. You can specify the acknowledgement mode while creating a JMS session. If your application uses transactions, it can choose from one of these transaction options: transacted session, MDB with container-managed transaction demarcation (CMTD), and MDB with bean-managed transaction demarcation (BMTD).

Acknowledgement mode:

  • Auto mode: In auto mode, the messages sent or received from the session are acknowledged automatically. This is one of the simplest modes, and it expresses JMS Transaction power by enabling a once-only message delivery guarantee.
  • Duplicates okay mode: In duplicates okay mode, the messages sent or received from the session are automatically acknowledged similar to auto mode, but a little late. Under rare circumstances, the messages might get delivered more than once. This enables an at-least-once message delivery guarantee.
  • Client mode: In client mode, the messages sent or received from the session are not automatically acknowledged. The application must acknowledge the message receipt. This mode gives complete control over message acknowledgement, at the cost of increased code complexity. 

Transaction options:

  • Transacted session: An application can participate in a transaction by creating a transacted session (or local transaction) in which the application completely controls the message delivery by either committing or rolling back the session.
  • Message-driven beans with CMTD: An MDB can participate in a container transaction by specifying CMTD in the XML deployment descriptor. Here, the transaction commits upon successful message processing, or the application can explicitly roll it back.
  • Message-driven beans with BMTD: An MDB can choose not to participate in a container transaction by specifying BMTD in the XML deployment descriptor. Here, the MDB programmer has to design and code programmatic transactions.

Auto acknowledgement:

To implement the auto acknowledgement mode, while creating JMS transaction the receiver’s session, specify false as the first argument and Session.AUTO_ACKNOWLEDGE as the second argument in the createSession() factory method. Specifying false will create a non-transacted session. The second parameter Session.AUTO_ACKNOWLEDGE creates a session that will automatically acknowledge messages. A message is automatically acknowledged when it returns successfully from the receive() method. If the receiver uses the interface MessageListener, the message is automatically acknowledged when it successfully returns from the onMessage() method. In case of failure, while executing the receive() method or the onMessage() method, the message is automatically redelivered.

Step 1: Sender Code

import javax.naming.InitialContext;
import javax.jms.*;
public class Sender {
    public static void main(String[] args) {
        System.out.println("Starting...");
        QueueConnectionFactory aQCF = null;
        QueueConnection aQC = null;
        QueueSession aQS = null;
        QueueSender aSender  = null;
        try {
            InitialContext aIC = new InitialContext(Resource.getResources());
            aQCF = (QueueConnectionFactory) aIC.lookup(
                iConstants.FACTORY_NAME
            );
            aQC = aQCF.createQueueConnection();
            aQS = aQC.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue aQueue = (Queue) aIC.lookup(iConstants.QUEUE_NAME);
            aSender = aQS.createSender(aQueue);
            aQC.start();
            for (int i = 0; i < 10; i++) {
                aSender.send(aQS.createObjectMessage(new Integer(i)));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (aSender != null) {
                    aSender.close();
                }
                if (aQS != null) {
                    aQS.close();
                }
                if (aQC != null) {
                    aQC.stop();
                    aQC.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Ending...");
    }
}

Step 2: Receiver Code

import javax.jms.*;
import javax.naming.InitialContext;
import java.io.InputStreamReader;
public abstract class Receiver {
    protected void doAll() {
        QueueConnectionFactory aQCF = null;
        QueueConnection aQC = null;
        QueueSession aQS = null;
        QueueReceiver aQR  = null;
        try {
            InitialContext aIC = new InitialContext(Resource.getResources());
            aQCF = (QueueConnectionFactory) aIC.lookup(
                iConstants.FACTORY_NAME
            );
            aQC = aQCF.createQueueConnection();
            aQS = createQueueSession(aQC);
            final QueueSession aQS1 = aQS;
            Queue aQueue = (Queue) aIC.lookup(iConstants.QUEUE_NAME);
            aQR = aQS.createReceiver(aQueue);
            MessageListener aML = new MessageListener() {
                public void onMessage(Message aMessage) {
                    try {
                        processMessage(aMessage, aQS1);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            };
            aQR.setMessageListener(aML);
            aQC.start();
            InputStreamReader aISR = new InputStreamReader(System.in);
            char aAnswer = ' ';
            do {
                aAnswer = (char) aISR.read();
                if ((aAnswer == 'r') || (aAnswer == 'R')) {
                    aQS.recover();
                }
            } while ((aAnswer != 'q') && (aAnswer != 'Q'));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (aQR != null) {
                    aQR.close();
                }
                if (aQS != null) {
                    aQS.close();
                }
                if (aQC != null) {
                    aQC.stop();
                    aQC.close();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    protected void processMessage(Message aMessage, QueueSession aQS) throws JMSException {
        if (aMessage instanceof ObjectMessage) {
            ObjectMessage aOM = (ObjectMessage) aMessage;
            System.out.print(aOM.getObject() + " ");
        }
    }
    protected abstract QueueSession createQueueSession(
        QueueConnection aQC
    ) throws JMSException;
}

Step 3: AutoReceiver Code

import javax.naming.InitialContext;
import javax.jms.*;
import java.io.InputStreamReader;
public class AutoReceiver extends Receiver {
    public static void main(String[] args) {
        System.out.println("Starting...");
        new AutoReceiver().doAll();
        System.out.println("Ending...");
    }
    protected QueueSession createQueueSession(
        QueueConnection aQC
        ) throws JMSException {
        return aQC.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    }
}

Duplicates okay acknowledgement:

This mode closely resembles the auto acknowledgement mode. However, rather than passing Session.AUTO_ACKNOWLEDGE, you can specify Session.DUPS_OK_ACKNOWLEDGE as the acknowledgement mode of createSession() second argument. During failure recovery, certain messages are probably delivered more than once.

Step 1-2 will be the same as Auto acknowledgement mode.

Step 3: DuplicatesOkayReceiver

import javax.naming.InitialContext;
import javax.jms.*;
import java.io.InputStreamReader;
public class DuplicatesOkayReceiver extends Receiver {
    public static void main(String[] args) {
        System.out.println("Starting...");
        new DuplicatesOkayReceiver().doAll();
        System.out.println("Ending...");
    }
    protected QueueSession createQueueSession(
        QueueConnection aQC
        ) throws JMSException {
        return aQC.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
    }
}

The difference between auto mode and duplicates okay mode is a tradeoff between delivery guarantee and throughput. With at-least-once message delivery guarantee, duplicates okay mode achieves higher throughput.

Client acknowledgement:

In this mode, when you create the receiver’s session, you need to specify false as createSession() first argument and Session.CLIENT_ACKNOWLEDGE as the second argument. Specifying false will create a non-transacted session. In this mode, invoking the Message class acknowledge() method will explicitly acknowledges the message.

Step 1-2 will be the same as Auto acknowledgement mode.

Step 3: ClientReceiver

import javax.naming.InitialContext;
import javax.jms.*;
import java.io.InputStreamReader;
public class ClientReceiver extends Receiver {
    public static void main(String[] args) {
        System.out.println("Starting...");
        new ClientReceiver().doAll();
        System.out.println("Ending...");
    }
    protected QueueSession createQueueSession(
        QueueConnection aQC
        ) throws JMSException {
        return aQC.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
    }
    protected void processMessage(Message aMessage, QueueSession aQS)
        throws JMSException
    {
        if (aMessage instanceof ObjectMessage) {
            ObjectMessage aOM = (ObjectMessage) aMessage;
            System.out.print(aOM.getObject() + " " );
            Integer i = (Integer) aOM.getObject();
            int ii = i.intValue();
            if (ii == 5) {
                aOM.acknowledge();
            }
        }
    }
}

Transacted session:

For implementing the transacted session mode, when creating the receiver’s session, you need to specify true as createSession() first argument and ignore the createSession() method second argument.

The application indicates successful message processing by invoking the Session class’s commit() method. The application can reject a message or a message group by invoking Session class’s rollback() method. Calling the commit() method commits all the messages the session receives. Similarly, calling the rollback() method rejects all the messages the session receives. Session’s commit() and rollback() methods make sense only with the transacted session option. The transacted session uses a chained-transaction model. In a chained-transaction model, an application does not explicitly start a transaction. Upon calling either the commit() or the rollback() methods, the application automatically starts a new transaction. Because a transaction is not explicitly started, it is always present and available.

import javax.naming.InitialContext;
import javax.jms.*;
import java.io.InputStreamReader;
public class TransactedReceiver extends Receiver {
    public static void main(String[] args) {
        System.out.println("Starting...");
        new TransactedReceiver().doAll();
        System.out.println("Ending...");
    }
    protected QueueSession createQueueSession(
        QueueConnection aQC
        ) throws JMSException {
        return aQC.createQueueSession(true, -1);
    }
    protected void processMessage(Message aMessage, QueueSession aQS)
        throws JMSException
    {
        if (aMessage instanceof ObjectMessage) {
            ObjectMessage aOM = (ObjectMessage) aMessage;
            System.out.print(aOM.getObject() + " " );
            Integer i = (Integer) aOM.getObject();
            int ii = i.intValue();
            if (ii == 5) {
                aQS.commit();
            } else if (ii == 9) {
                aQS.rollback();
            }
        }
    }
}

Message-driven bean with CMTD:

Upon deploying a message-driven bean, CMTD is specified in the XML deployment descriptor. The following XML fragment from the ejb-jar.xml depicts that the <transaction-type> attribute is Container:

 <message-driven>
      <ejb-name>cmtdBean</ejb-name>
      <ejb-class>com.examples.jms.transactions.MDB_CMTD</ejb-class>
      <transaction-type>Container</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
    </message-driven>
Plus, the <trans-attribute> is specified as Required:
    <assembly-descriptor>
        <container-transaction>
            <method>
                <ejb-name>cmtdBean</ejb-name>
                <method-name>*</method-name>
            </method>
            <trans-attribute>Required</trans-attribute>
        </container-transaction>
    </assembly-descriptor>

A transaction automatically starts when the JMS provider removes the message from the destination and delivers to the MDB’s onMessage() method. The transaction commits on the successful return from the onMessage() method. Since the configuration is CMTD, the container manages the transaction’s begin, commit, and roll back. An MDB can indicate to the container the transaction should be rolled back by calling MessageDrivenContext’s setRollbackOnly() method. When a container rolls back a transaction, the message is automatically redelivered. This option’s redelivery semantics and mechanics resemble that of the transacted session option.

import javax.ejb.*;
import javax.jms.*;
public class MDB_CMTD extends MDB {
    public void onMessage(Message aMessage) {
        try {
            if (aMessage instanceof ObjectMessage) {
                ObjectMessage aOM = (ObjectMessage) aMessage;
                System.out.print(aOM.getObject() + " " );
                Integer i = (Integer) aOM.getObject();
                int ii = i.intValue();
                if (ii > 5) {
                    mMDC.setRollbackOnly();
                }
                if (ii == 9) {
                    System.out.println();
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Message-driven bean with BMTD:

Upon deploying a message-driven bean, BMTD is specified in the XML deployment descriptor. The following XML fragment from the ejb-jar.xml depicts that the <transaction-type> attribute is Bean:

 <message-driven>
      <ejb-name>bmtdBean</ejb-name>
      <ejb-class>com.malani.examples.jms.transactions.MDB_BMTD</ejb-class>
      <transaction-type>Bean</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
    </message-driven>

A message-driven bean with BMTD can obtain a transaction by calling MessageDrivenContext getUserTransaction() method. Invoking the UserTransaction interface begin(), commit(), and rollback() methods controls the user transaction. The transaction does not have an impact on message acknowledgement and, in turn, it has no impact on message redelivery. Rolling back the transaction do not cause the messages to get redelivered.

To implement message-driven beans with BMTD, specify the Session.AUTO_ACKNOWLEDGE or Session.DUPS_OK_ACKNOWLEDGE as the acknowledgement mode. Hence, the MDB with BMTD option works with auto acknowledgement and duplications okay acknowledgement options.

import javax.ejb.*;
import javax.jms.*;
import javax.transaction.UserTransaction;
public class MDB_BMTD extends MDB {
    public void onMessage(Message aMessage) {
        try {
            if (aMessage instanceof ObjectMessage) {
                UserTransaction aUT = mMDC.getUserTransaction();
                aUT.begin();
                ObjectMessage aOM = (ObjectMessage) aMessage;
                System.out.print(aOM.getObject() + " " );
                Integer i = (Integer) aOM.getObject();
                int ii = i.intValue();
                if (ii > 5) {
                    aUT.rollback();
                } else {
                    aUT.commit();
                }
                if (ii == 9) {
                    System.out.println();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Share this article
Subscribe
By pressing the Subscribe button, you confirm that you have read our Privacy Policy.
Need a Free Demo Class?
Join H2K Infosys IT Online Training
Enroll Free demo class