Saturday, February 7, 2015

Active MQ getting started -2: Publisher/Subscriber

This is continuing of my previous post about ActiveMQ : Publisher/Subcriber(Sender/Receiver).
Actually it will be copy/paste of previous post, with few changes, so I suggest to read it before reading this topic :)

Theoretical information from wikipedia:
The publish/subscribe model supports publishing messages to a particular message topic. Subscribers may register interest in receiving messages on a particular message topic. In this model, neither the publisher nor the subscriber knows about each other. A good analogy for this is an anonymous bulletin board
  • Zero or more consumers will receive the message.
  • There is a timing dependency between publishers and subscribers. The publisher has to create a message topic for clients to subscribe. The subscriber has to remain continuously active to receive messages, unless it has established a durable subscription. In that case, messages published while the subscriber is not connected will be redistributed whenever it reconnects.
JMS provides a way of separating the application from the transport layer of providing data. The same Java classes can be used to communicate with different JMS providers by using the Java Naming and Directory Interface (JNDI) information for the desired provider. The classes first use a connection factory to connect to the queue or topic, and then use populate and send or publish the messages. On the receiving side, the clients then receive or subscribe to the messages.
   

1. Publisher.

It's a copy of Producer class from previous post with only one difference: instead of queue we have to create topic.

package com.demien.amq;

import javax.jms.*;
import java.io.Serializable;

public class Publisher<T extends Serializable> {
    private Connection connection;
    private Session session;
    private MessageProducer publisher;

    public Publisher(ConnectionFactory factory, String topicName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        publisher = session.createProducer(topic);
    }

    public void postObjectMessage(T object) throws JMSException {
        Message message = session.createObjectMessage(object);
        publisher.send(message);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }
} 
 

2. Subscriber.

Subscriber also was not created from scratch :) It's copy of Receiver class. Also I added field subscriberId for test, to make process of message receiving by different subscribers more transparent.
 
package com.demien.amq;

import org.apache.activemq.command.ActiveMQObjectMessage;

import javax.jms.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class Subscriber<T extends Serializable> implements MessageListener {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private String subscriberId="";

    private List<T> messages = new ArrayList<T>();

    public Subscriber(ConnectionFactory factory, String topicName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        consumer = session.createConsumer(topic);
        consumer.setMessageListener(this);
    }

    public void setSubscriberId(String subscriberId) {
        this.subscriberId=subscriberId;
    }

    public List<T> getMessages() {
        return messages;
    }

    public void onMessage(Message message) {
        try {
            if (message instanceof ObjectMessage) {
                System.out.println("Subscriber<"+subscriberId+"> received message.");
                messages.add((T)((ActiveMQObjectMessage) message).getObject());
            } else {
                System.out.println("Invalid message received.");
            }
        } catch (Exception e) {
            System.out.println("Caught:" + e);
            e.printStackTrace();
        }
    }
}
 

3. Test class. 

For test we can use MassageBroker and TestObject from my previous post. So, all we need it to create test class which will post our TestObject into topic and check : every topic subscriber have to receive these message. 
 
 
package com.demien.amq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertTrue;

public class PublisherSubscriberTest {
    public static String brokerURL = "tcp://localhost:61616";
    private final ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    private final String topicName="TestTopic";

    private final int SUBSCRIBERS_COUNT=10;

    Publisher<TestObject> publisher;
    List<Subscriber<TestObject>> subscribers;

    @Before
    public void init() throws JMSException {
        publisher=new Publisher<TestObject>(factory, topicName);

        subscribers=new ArrayList<Subscriber<TestObject>>();
        for (int i=0; i<SUBSCRIBERS_COUNT; i++) {
            Subscriber<TestObject> subscriber=new Subscriber<TestObject>(factory, topicName);
            subscriber.setSubscriberId(Integer.toString(i));
            subscribers.add(subscriber);
        }
    }

    @After
    public void finish() throws JMSException {
        publisher.close();
    }

    @Test
    public void multipleMessagesTest() throws JMSException, InterruptedException {
        Long TEST_ID=-1L;
        String TEST_NAME="test name";


        TestObject testObject=new TestObject();
        testObject.id=new Long(TEST_ID);
        testObject.name=new String(TEST_NAME);

        //post message
        publisher.postObjectMessage(testObject);
        Thread.sleep(1000);

        // receive messages
        for (Subscriber<TestObject> subscriber:subscribers) {
            TestObject received=subscriber.getMessages().get(0);
            assertTrue(received.name.equals(TEST_NAME));
            assertTrue(received.id.equals(TEST_ID));
        }

    }


}
 

4. Results.

Of course, before running of our test we have to run our message broker. 
For me, results where : 
 
log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.WireFormatNegotiator).
log4j:WARN Please initialize the log4j system properly.
Subscriber<0> received message.
Subscriber<2> received message.
Subscriber<1> received message.
Subscriber<4> received message.
Subscriber<5> received message.
Subscriber<8> received message.
Subscriber<6> received message.
Subscriber<9> received message.
Subscriber<3> received message.
Subscriber<7> received message.

Process finished with exit code 0

So, each subscriber received message! Everyone is happy :) 

Source code (together with previous topic) can be downloaded from here.

No comments:

Post a Comment