Tuesday, February 3, 2015

Active MQ getting started -1: Producer/Consumer(Sender/Receiver)

Before ActiveMQ, I worked with JMS only on Application Servers such as Weblogic and Glassfish.
When I started my work with ActiveM, I was surprised, how easy is to work with it.

At this post I'm going to show how to use ActiveMQ in simple application : point-to-point message exchange :  Produces/Consumer(Sender/Receiver).

Theoretical information.

JMS : http://en.wikipedia.org/wiki/Java_Message_Service
The Java Message Service (JMSAPI is a Java Message Oriented Middleware (MOM) API[1] for sending messages between two or more clients. JMS is a part of the Java Platform, Enterprise Edition, and is defined by a specification developed under the Java Community Process as JSR 914.[2] It is a messaging standard that allows application components based on the Java Enterprise Edition (Java EE) to create, send, receive, and read messages. It allows the communication between different components of adistributed application to be loosely coupled, reliable, and asynchronous.[3]

ActiveMQ: http://en.wikipedia.org/wiki/Apache_ActiveMQ
Apache ActiveMQ is an open source message broker written in Java together with a full Java Message Service (JMS) client. It provides "Enterprise Features" which in this case means fostering the communication from more than one client or server. Supported clients include Java via JMS 1.1 as well as several other "cross language" clients.[1] The communication is managed with features such as computer clustering and ability to use any database as a JMS persistence provider besides virtual memorycache, andjournal persistency.[2]

Point-to-point model(from JMS wiki)

In point-to-point messaging system, messages are routed to an individual consumer which maintains a queue of "incoming" messages. This messaging type is built on the concept of message queues, senders, and receivers. Each message is addressed to a specific queue, and the receiving clients extract messages from the queues established to hold their messages. While any number of producers can send messages to the queue, each message is guaranteed to be delivered, and consumed by one consumer. Queues retain all messages sent to them until the messages are consumed or until the messages expire. If no consumers are registered to consume the messages, the queue holds them until a consumer registers to consume them.


Let's now create application(actually it will be 2 applications) to try how it(activeMQ) works.  

1. ActiveMQ message broker. 

First of all, before creating our application, we need to run message broker : ActiveMQ "server".
Let's create folder for that : mroker.
Next step - file mbroker/pom.xml :
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.demien.amq</groupId>
  <artifactId>mbroker</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>mbroker</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.activemq.tooling</groupId>
        <artifactId>maven-activemq-plugin</artifactId>
        <version>5.1</version>
        <configuration>
          <configUri>xbean:file:./conf/activemq.xml</configUri>
          <fork>false</fork>
          <systemProperties>
            <property>
              <name>javax.net.ssl.keyStorePassword</name>
              <value>password</value>
            </property>
            <property>
              <name>org.apache.activemq.default.directory.prefix</name>
              <value>./target/</value>
            </property>
          </systemProperties>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring</artifactId>
            <version>2.5.5</version>
          </dependency>
        </dependencies>
      </plugin>
    </plugins>
  </build>
</project>

Also we need configuration file : mbroker/conf/activemq.xml
<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
  http://activemq.apache.org/schema/core/activemq-core.xsd
  ">
  <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="./data">
    <!-- The transport connectors ActiveMQ will listen to -->
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://localhost:61616"/>
    </transportConnectors>
  </broker>
</beans>

Now we can run our message broker by command : 
mvn org.apache.activemq.tooling:maven-activemq-plugin:5.2.0:run


2. Test application. 

Now, when our message broker up and running, let's create our test application: 
Create directory test-app and place inside it next pom.xml file: 
test-app/pom.xml: 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.demien.amq</groupId>
  <artifactId>consumer</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>consumer</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.8.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
      <version>1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-core</artifactId>
      <version>5.2.0</version>
    </dependency>
  </dependencies>
  <repositories>
    <repository>
      <id>jboss</id>
      <url>http://repository.jboss.com/maven2</url>
      <releases>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>
</project>



3. Producer.

For puplishing our message into message broker we have to create Producer class in testapp/src/main/java/com/demien/amq/

package com.demien.amq;
import javax.jms.*;
import java.io.Serializable;

public class Producer<T extends Serializable> {
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Producer(ConnectionFactory factory, String queueName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queueName);
        producer = session.createProducer(destination);
    }

    public void postObjectMessage(T object) throws JMSException {
        Message message = session.createObjectMessage(object);
        producer.send(message);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }
}


4. Consumer

Consumer class will receive our messages from message broker and store them in local storage(ArrayList).
testapp/src/main/java/com/demien/amq/

package com.demien.amq;

import org.apache.activemq.command.ActiveMQObjectMessage;

import javax.jms.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class Consumer<T extends Serializable> implements MessageListener {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    private List<T> messages = new ArrayList<T>();

    public Consumer(ConnectionFactory factory, String queueName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queueName);
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(this);
    }

    public List<T> getMessages() {
        return messages;
    }

    public void onMessage(Message message) {
        try {
            if (message instanceof ObjectMessage) {
                messages.add((T)((ActiveMQObjectMessage) message).getObject());
            } else {
                System.out.println("Invalid message received.");
            }
        } catch (Exception e) {
            System.out.println("Caught:" + e);
            e.printStackTrace();
        }
    }
}

5. Simple test object class, for using in tests.

Now we can create simple class for testing in our message exchange: 

package com.demien.amq;
import java.io.Serializable;

public class TestObject implements Serializable {
    public Long id;
    public String name;
}

6.Test itself.

In our test, we will try to post data into our query using producer and to read them using consumer.


package com.demien.amq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.List;
import static org.junit.Assert.*;

public class AppTest  {

    public static String brokerURL = "tcp://localhost:61616";
    private final ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    private final String queueName="TestQueue";

    Producer producer;
    Consumer consumer;

    @Before
    public void init() throws JMSException {
        producer=new Producer<TestObject>(factory, queueName);
        consumer=new Consumer<TestObject>(factory, queueName);
    }

    @After
    public void finish() throws JMSException {
        producer.close();
    }

    @Test
    public void singleMessageTest() throws JMSException, InterruptedException {
        Long TEST_ID=-1L;
        String TEST_NAME="test name";

        TestObject testObject=new TestObject();
        testObject.id=new Long(TEST_ID);
        testObject.name=new String(TEST_NAME);

        //post message
        producer.postObjectMessage(testObject);
        Thread.sleep(1000);

        //receive message
        List<TestObject> messages=consumer.getMessages();
        assertTrue(messages.size()>0);
        TestObject received=messages.get(0);
        assertTrue(received.name.equals(TEST_NAME));
        assertTrue(received.id.equals(TEST_ID));
    }
}

It message broker was started before running of test - it should run successfully. 

Full sources can be downloaded from here.